summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/CommandComplete.c94
-rw-r--r--src/protocol/child.c161
-rw-r--r--src/protocol/pool_connection_pool.c32
-rw-r--r--src/protocol/pool_pg_utils.c94
-rw-r--r--src/protocol/pool_process_query.c318
-rw-r--r--src/protocol/pool_proto2.c40
-rw-r--r--src/protocol/pool_proto_modules.c451
7 files changed, 621 insertions, 569 deletions
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index b71678155..ef144ca31 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -40,15 +40,15 @@
#include "utils/pool_stream.h"
static int extract_ntuples(char *message);
-static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete);
-static int forward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen);
-static int forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen);
-static int forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen);
-static void process_clear_cache(POOL_CONNECTION_POOL * backend);
+static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
+static int forward_command_complete(POOL_CONNECTION *frontend, char *packet, int packetlen);
+static int forward_empty_query(POOL_CONNECTION *frontend, char *packet, int packetlen);
+static int forward_packet_to_frontend(POOL_CONNECTION *frontend, char kind, char *packet, int packetlen);
+static void process_clear_cache(POOL_CONNECTION_POOL *backend);
static bool check_alter_role_statement(AlterRoleStmt *stmt);
POOL_STATUS
-CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool command_complete)
+CommandComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, bool command_complete)
{
int len,
len1;
@@ -230,9 +230,9 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool
state = TSTATE(backend, MAIN_NODE_ID);
/*
- * If some rows have been fetched by an execute with non 0 row option,
- * we do not create cache.
- */
+ * If some rows have been fetched by an execute with non 0 row
+ * option, we do not create cache.
+ */
pool_handle_query_cache(backend, query, node, state,
session_context->query_context->partial_fetch);
@@ -276,7 +276,7 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool
if (can_query_context_destroy(session_context->query_context))
{
- POOL_SENT_MESSAGE * msg = pool_get_sent_message_by_query_context(session_context->query_context);
+ POOL_SENT_MESSAGE *msg = pool_get_sent_message_by_query_context(session_context->query_context);
if (!msg || (msg && *msg->name == '\0'))
{
@@ -294,7 +294,7 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool
* Handle misc process which is necessary when query context exists.
*/
void
-handle_query_context(POOL_CONNECTION_POOL * backend)
+handle_query_context(POOL_CONNECTION_POOL *backend)
{
POOL_SESSION_CONTEXT *session_context;
Node *node;
@@ -344,13 +344,13 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
}
/*
- * JDBC driver sends "BEGIN" query internally if setAutoCommit(false).
- * But it does not send Sync message after "BEGIN" query. In extended
- * query protocol, PostgreSQL returns ReadyForQuery when a client sends
- * Sync message. Problem is, pgpool can't know the transaction state
- * without receiving ReadyForQuery. So we remember that we need to send
- * Sync message internally afterward, whenever we receive BEGIN in
- * extended protocol.
+ * JDBC driver sends "BEGIN" query internally if setAutoCommit(false). But
+ * it does not send Sync message after "BEGIN" query. In extended query
+ * protocol, PostgreSQL returns ReadyForQuery when a client sends Sync
+ * message. Problem is, pgpool can't know the transaction state without
+ * receiving ReadyForQuery. So we remember that we need to send Sync
+ * message internally afterward, whenever we receive BEGIN in extended
+ * protocol.
*/
else if (IsA(node, TransactionStmt))
{
@@ -374,10 +374,10 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
pool_unset_failed_transaction();
pool_unset_transaction_isolation();
}
- else if (stmt->kind == TRANS_STMT_COMMIT)
+ else if (stmt->kind == TRANS_STMT_COMMIT)
{
/* Commit ongoing CREATE/DROP temp table status */
- pool_temp_tables_commit_pending();
+ pool_temp_tables_commit_pending();
/* Forget a transaction was started by multi statement query */
unset_tx_started_by_multi_statement_query();
@@ -412,12 +412,13 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
else if (IsA(node, CreateStmt))
{
CreateStmt *stmt = (CreateStmt *) node;
- POOL_TEMP_TABLE_STATE state;
+ POOL_TEMP_TABLE_STATE state;
/* Is this a temporary table? */
if (stmt->relation->relpersistence == 't')
{
- if (TSTATE(backend, MAIN_NODE_ID ) == 'T') /* Are we inside a transaction? */
+ if (TSTATE(backend, MAIN_NODE_ID) == 'T') /* Are we inside a
+ * transaction? */
{
state = TEMP_TABLE_CREATING;
}
@@ -433,8 +434,8 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
}
else if (IsA(node, DropStmt))
{
- DropStmt *stmt = (DropStmt *) node;
- POOL_TEMP_TABLE_STATE state;
+ DropStmt *stmt = (DropStmt *) node;
+ POOL_TEMP_TABLE_STATE state;
if (stmt->removeType == OBJECT_TABLE)
{
@@ -442,7 +443,8 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
ListCell *cell;
ListCell *next;
- if (TSTATE(backend, MAIN_NODE_ID ) == 'T') /* Are we inside a transaction? */
+ if (TSTATE(backend, MAIN_NODE_ID) == 'T') /* Are we inside a
+ * transaction? */
{
state = TEMP_TABLE_DROPPING;
}
@@ -453,7 +455,8 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
for (cell = list_head(session_context->temp_tables); cell; cell = next)
{
- char *tablename = (char *)lfirst(cell);
+ char *tablename = (char *) lfirst(cell);
+
ereport(DEBUG1,
(errmsg("Dropping temp table: %s", tablename)));
pool_temp_tables_delete(tablename, state);
@@ -478,7 +481,7 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
}
else if (IsA(node, GrantStmt))
{
- GrantStmt *stmt = (GrantStmt *) node;
+ GrantStmt *stmt = (GrantStmt *) node;
/* REVOKE? */
if (stmt->is_grant)
@@ -510,15 +513,15 @@ handle_query_context(POOL_CONNECTION_POOL * backend)
static bool
check_alter_role_statement(AlterRoleStmt *stmt)
{
- ListCell *l;
+ ListCell *l;
foreach(l, stmt->options)
{
- DefElem *elm = (DefElem *) lfirst(l);
+ DefElem *elm = (DefElem *) lfirst(l);
/*
- * We want to detect other than ALTER ROLE foo WITH PASSWORD or
- * WITH CONNECTION LIMIT case. It does not change any privilege of the
+ * We want to detect other than ALTER ROLE foo WITH PASSWORD or WITH
+ * CONNECTION LIMIT case. It does not change any privilege of the
* role.
*/
if (strcmp(elm->defname, "password") &&
@@ -553,7 +556,8 @@ extract_ntuples(char *message)
/*
* Handle mismatch tuples
*/
-static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete)
+static POOL_STATUS
+handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete)
{
POOL_SESSION_CONTEXT *session_context;
@@ -620,7 +624,7 @@ static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNE
if (session_context->mismatch_ntuples)
{
- StringInfoData msg;
+ StringInfoData msg;
initStringInfo(&msg);
appendStringInfoString(&msg, "pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \"");
@@ -667,7 +671,7 @@ static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNE
* Forward Command complete packet to frontend
*/
static int
-forward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen)
+forward_command_complete(POOL_CONNECTION *frontend, char *packet, int packetlen)
{
return forward_packet_to_frontend(frontend, 'C', packet, packetlen);
}
@@ -676,7 +680,7 @@ forward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen
* Forward Empty query response to frontend
*/
static int
-forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen)
+forward_empty_query(POOL_CONNECTION *frontend, char *packet, int packetlen)
{
return forward_packet_to_frontend(frontend, 'I', packet, packetlen);
}
@@ -685,7 +689,7 @@ forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen)
* Forward packet to frontend
*/
static int
-forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen)
+forward_packet_to_frontend(POOL_CONNECTION *frontend, char kind, char *packet, int packetlen)
{
int sendlen;
@@ -705,7 +709,7 @@ forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet,
* Process statements that need clearing query cache
*/
static void
-process_clear_cache(POOL_CONNECTION_POOL * backend)
+process_clear_cache(POOL_CONNECTION_POOL *backend)
{
/* Query cache enabled? */
if (!pool_config->memory_cache_enabled)
@@ -717,15 +721,15 @@ process_clear_cache(POOL_CONNECTION_POOL * backend)
/*
* Are we inside a transaction?
*/
- if (TSTATE(backend, MAIN_NODE_ID ) == 'T')
+ if (TSTATE(backend, MAIN_NODE_ID) == 'T')
{
/*
- * Disable query cache in this transaction.
- * All query cache will be cleared at commit.
+ * Disable query cache in this transaction. All query cache will
+ * be cleared at commit.
*/
set_query_cache_disabled_tx();
}
- else if (TSTATE(backend, MAIN_NODE_ID ) == 'I') /* outside transaction */
+ else if (TSTATE(backend, MAIN_NODE_ID) == 'I') /* outside transaction */
{
/*
* Clear all the query cache.
@@ -738,14 +742,14 @@ process_clear_cache(POOL_CONNECTION_POOL * backend)
/*
* Are we inside a transaction?
*/
- if (TSTATE(backend, MAIN_NODE_ID ) == 'T')
+ if (TSTATE(backend, MAIN_NODE_ID) == 'T')
{
/* Inside user started transaction? */
if (!INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID))
{
/*
- * Disable query cache in this transaction.
- * All query cache will be cleared at commit.
+ * Disable query cache in this transaction. All query cache
+ * will be cleared at commit.
*/
set_query_cache_disabled_tx();
}
@@ -757,7 +761,7 @@ process_clear_cache(POOL_CONNECTION_POOL * backend)
clear_query_cache();
}
}
- else if (TSTATE(backend, MAIN_NODE_ID ) == 'I') /* outside transaction */
+ else if (TSTATE(backend, MAIN_NODE_ID) == 'I') /* outside transaction */
{
/*
* Clear all the query cache.
diff --git a/src/protocol/child.c b/src/protocol/child.c
index 1ef88910d..cf2161806 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -68,19 +68,19 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
-static StartupPacket *read_startup_packet(POOL_CONNECTION * cp);
-static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend);
+static StartupPacket *read_startup_packet(POOL_CONNECTION *cp);
+static POOL_CONNECTION_POOL *connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend);
static RETSIGTYPE die(int sig);
static RETSIGTYPE close_idle_connection(int sig);
static RETSIGTYPE wakeup_handler(int sig);
static RETSIGTYPE reload_config_handler(int sig);
static RETSIGTYPE authentication_timeout(int sig);
-static void send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
+static void send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
static int connection_count_up(void);
static void connection_count_down(void);
-static bool connect_using_existing_connection(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- StartupPacket *sp);
+static bool connect_using_existing_connection(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ StartupPacket *sp);
static void check_restart_request(void);
static void check_exit_request(void);
static void enable_authentication_timeout(void);
@@ -89,17 +89,17 @@ static int wait_for_new_connections(int *fds, SockAddr *saddr);
static void check_config_reload(void);
static void get_backends_status(unsigned int *valid_backends, unsigned int *down_backends);
static void validate_backend_connectivity(int front_end_fd);
-static POOL_CONNECTION * get_connection(int front_end_fd, SockAddr *saddr);
-static POOL_CONNECTION_POOL * get_backend_connection(POOL_CONNECTION * frontend);
+static POOL_CONNECTION *get_connection(int front_end_fd, SockAddr *saddr);
+static POOL_CONNECTION_POOL *get_backend_connection(POOL_CONNECTION *frontend);
static StartupPacket *StartupPacketCopy(StartupPacket *sp);
static void log_disconnections(char *database, char *username);
static void print_process_status(char *remote_host, char *remote_port);
-static bool backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid);
+static bool backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid);
static void child_will_go_down(int code, Datum arg);
-static int opt_sort(const void *a, const void *b);
+static int opt_sort(const void *a, const void *b);
-static bool unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt);
+static bool unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt);
/*
* Non 0 means SIGTERM (smart shutdown) or SIGINT (fast shutdown) has arrived
@@ -154,11 +154,12 @@ do_child(int *fds)
sigjmp_buf local_sigjmp_buf;
POOL_CONNECTION_POOL *volatile backend = NULL;
- /* counter for child_max_connections. "volatile" declaration is necessary
+ /*
+ * counter for child_max_connections. "volatile" declaration is necessary
* so that this is counted up even if long jump is issued due to
* ereport(ERROR).
*/
- volatile int connections_count = 0;
+ volatile int connections_count = 0;
char psbuf[NI_MAXHOST + 128];
@@ -355,7 +356,8 @@ do_child(int *fds)
*/
if (con_count > (pool_config->num_init_children - pool_config->reserved_connections))
{
- POOL_CONNECTION * cp;
+ POOL_CONNECTION *cp;
+
cp = pool_open(front_end_fd, false);
if (cp == NULL)
{
@@ -405,8 +407,8 @@ do_child(int *fds)
pool_initialize_private_backend_status();
/*
- * Connect to backend. Also do authentication between
- * frontend <--> pgpool and pgpool <--> backend.
+ * Connect to backend. Also do authentication between frontend <-->
+ * pgpool and pgpool <--> backend.
*/
backend = get_backend_connection(child_frontend);
if (!backend)
@@ -513,7 +515,7 @@ do_child(int *fds)
* return true if backend connection is cached
*/
static bool
-backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid)
+backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid)
{
StartupPacket *sp;
bool cache_connection = false;
@@ -600,18 +602,18 @@ backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * vol
* Read the startup packet and parse the contents.
*/
static StartupPacket *
-read_startup_packet(POOL_CONNECTION * cp)
+read_startup_packet(POOL_CONNECTION *cp)
{
StartupPacket *sp;
StartupPacket_v2 *sp2;
int protov;
int len;
char *p;
- char **guc_options;
- int opt_num = 0;
- char *sp_sort;
- char *tmpopt;
- int i;
+ char **guc_options;
+ int opt_num = 0;
+ char *sp_sort;
+ char *tmpopt;
+ int i;
sp = (StartupPacket *) palloc0(sizeof(*sp));
@@ -659,37 +661,38 @@ read_startup_packet(POOL_CONNECTION * cp)
case PROTO_MAJOR_V3: /* V3 */
/* copy startup_packet */
sp_sort = palloc0(len);
- memcpy(sp_sort,sp->startup_packet,len);
+ memcpy(sp_sort, sp->startup_packet, len);
p = sp_sort;
- p += sizeof(int); /* skip protocol version info */
+ p += sizeof(int); /* skip protocol version info */
/* count the number of options */
while (*p)
{
- p += (strlen(p) + 1); /* skip option name */
- p += (strlen(p) + 1); /* skip option value */
- opt_num ++;
+ p += (strlen(p) + 1); /* skip option name */
+ p += (strlen(p) + 1); /* skip option value */
+ opt_num++;
}
- guc_options = (char **)palloc0(opt_num * sizeof(char *));
+ guc_options = (char **) palloc0(opt_num * sizeof(char *));
/* get guc_option name list */
p = sp_sort + sizeof(int);
for (i = 0; i < opt_num; i++)
{
guc_options[i] = p;
- p += (strlen(p) + 1); /* skip option name */
- p += (strlen(p) + 1); /* skip option value */
+ p += (strlen(p) + 1); /* skip option name */
+ p += (strlen(p) + 1); /* skip option value */
}
/* sort option name using quick sort */
- qsort( (void *)guc_options, opt_num, sizeof(char *), opt_sort );
+ qsort((void *) guc_options, opt_num, sizeof(char *), opt_sort);
- p = sp->startup_packet + sizeof(int); /* skip protocol version info */
+ p = sp->startup_packet + sizeof(int); /* skip protocol version
+ * info */
for (i = 0; i < opt_num; i++)
{
tmpopt = guc_options[i];
- memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option name */
+ memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option name */
p += (strlen(tmpopt) + 1);
tmpopt += (strlen(tmpopt) + 1);
- memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option value */
+ memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option value */
p += (strlen(tmpopt) + 1);
}
@@ -733,7 +736,7 @@ read_startup_packet(POOL_CONNECTION * cp)
{
ereport(DEBUG1,
(errmsg("reading startup packet"),
- errdetail("guc name: %s value: %s", p, p+strlen(p)+1)));
+ errdetail("guc name: %s value: %s", p, p + strlen(p) + 1)));
p += (strlen(p) + 1);
}
@@ -789,8 +792,8 @@ read_startup_packet(POOL_CONNECTION * cp)
* Reuse existing connection
*/
static bool
-connect_using_existing_connection(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+connect_using_existing_connection(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
StartupPacket *sp)
{
int i,
@@ -824,8 +827,8 @@ connect_using_existing_connection(POOL_CONNECTION * frontend,
/* Reuse existing connection to backend */
frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext,
- "frontend_auth",
- ALLOCSET_DEFAULT_SIZES);
+ "frontend_auth",
+ ALLOCSET_DEFAULT_SIZES);
oldContext = MemoryContextSwitchTo(frontend_auth_cxt);
pool_do_reauth(frontend, backend);
@@ -905,7 +908,7 @@ connect_using_existing_connection(POOL_CONNECTION * frontend,
* process cancel request
*/
void
-cancel_request(CancelPacket * sp, int32 splen)
+cancel_request(CancelPacket *sp, int32 splen)
{
int len;
int fd;
@@ -915,7 +918,7 @@ cancel_request(CancelPacket * sp, int32 splen)
k;
ConnectionInfo *c = NULL;
bool found = false;
- int32 keylen; /* cancel key length */
+ int32 keylen; /* cancel key length */
if (pool_config->log_client_messages)
ereport(LOG,
@@ -958,7 +961,8 @@ cancel_request(CancelPacket * sp, int32 splen)
errdetail("found pid:%d keylen:%d i:%d", ntohl(c->pid), c->keylen, i)));
/*
- * "c" is a pointer to i th child, j th pool, and 0 th backend.
+ * "c" is a pointer to i th child, j th pool, and 0 th
+ * backend.
*/
c = pool_coninfo(i, j, 0);
found = true;
@@ -978,12 +982,12 @@ found:
/*
* We are sending cancel request message to all backend groups. So some
- * of query cancel requests may not work but it should not be a
- * problem. They are just ignored by the backend.
+ * of query cancel requests may not work but it should not be a problem.
+ * They are just ignored by the backend.
*/
for (i = 0; i < NUM_BACKENDS; i++, c++)
{
- int32 cancel_request_code;
+ int32 cancel_request_code;
if (!VALID_BACKEND(i))
continue;
@@ -1006,9 +1010,10 @@ found:
pool_set_db_node_id(con, i);
- len = htonl(splen + sizeof(int32)); /* splen does not include packet length field */
- pool_write(con, &len, sizeof(len)); /* send cancel messages length */
- cancel_request_code = htonl(PG_PROTOCOL(1234,5678)); /* cancel request code */
+ len = htonl(splen + sizeof(int32)); /* splen does not include packet
+ * length field */
+ pool_write(con, &len, sizeof(len)); /* send cancel messages length */
+ cancel_request_code = htonl(PG_PROTOCOL(1234, 5678)); /* cancel request code */
pool_write(con, &cancel_request_code, sizeof(int32));
pool_write(con, &c->pid, sizeof(int32)); /* send pid */
pool_write(con, c->key, keylen); /* send cancel key */
@@ -1068,11 +1073,12 @@ StartupPacketCopy(StartupPacket *sp)
* Create a new connection to backend.
* Authentication is performed if requested by backend.
*/
-static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend)
+static POOL_CONNECTION_POOL *
+connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend)
{
POOL_CONNECTION_POOL *backend;
StartupPacket *volatile topmem_sp = NULL;
- volatile bool topmem_sp_set = false;
+ volatile bool topmem_sp_set = false;
int i;
/* connect to the backend */
@@ -1122,8 +1128,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION
* do authentication stuff
*/
frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext,
- "frontend_auth",
- ALLOCSET_DEFAULT_SIZES);
+ "frontend_auth",
+ ALLOCSET_DEFAULT_SIZES);
oldContext = MemoryContextSwitchTo(frontend_auth_cxt);
/* do authentication against backend */
@@ -1141,7 +1147,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION
}
PG_END_TRY();
- /* At this point, we need to free previously allocated memory for the
+ /*
+ * At this point, we need to free previously allocated memory for the
* startup packet if no backend is up.
*/
if (!topmem_sp_set && topmem_sp != NULL)
@@ -1244,7 +1251,7 @@ static RETSIGTYPE close_idle_connection(int sig)
if (CONNECTION_SLOT(p, main_node_id)->closetime > 0) /* idle connection? */
{
- bool freed = false;
+ bool freed = false;
pool_send_frontend_exits(p);
@@ -1313,7 +1320,7 @@ disable_authentication_timeout(void)
* Send parameter status message to frontend.
*/
static void
-send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
int index;
char *name,
@@ -1363,7 +1370,7 @@ child_will_go_down(int code, Datum arg)
if (child_frontend)
log_disconnections(child_frontend->database, child_frontend->username);
else
- log_disconnections("","");
+ log_disconnections("", "");
}
}
@@ -1594,7 +1601,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
}
else
{
- int sts;
+ int sts;
for (;;)
{
@@ -1609,10 +1616,10 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
if (backend_timer_expired)
{
/*
- * We add 10 seconds to connection_life_time so that there's
- * enough margin.
+ * We add 10 seconds to connection_life_time so that
+ * there's enough margin.
*/
- int seconds = pool_config->connection_life_time + 10;
+ int seconds = pool_config->connection_life_time + 10;
while (seconds-- > 0)
{
@@ -1627,7 +1634,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
}
}
}
- else /* success or other error */
+ else /* success or other error */
break;
}
}
@@ -1661,7 +1668,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
numfds = select(nsocks, &rmask, NULL, NULL, timeout);
- /* not timeout*/
+ /* not timeout */
if (numfds != 0)
break;
@@ -1796,9 +1803,10 @@ retry_accept:
}
static bool
-unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt)
+unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt)
{
- int i;
+ int i;
+
for (i = 0; i < num_unix_fds; i++)
{
if (!FD_ISSET(fds[i], opt))
@@ -1900,7 +1908,7 @@ validate_backend_connectivity(int front_end_fd)
error_hint,
__FILE__,
__LINE__);
-
+
}
PG_CATCH();
{
@@ -1926,7 +1934,7 @@ static POOL_CONNECTION *
get_connection(int front_end_fd, SockAddr *saddr)
{
POOL_CONNECTION *cp;
- ProcessInfo *pi;
+ ProcessInfo *pi;
ereport(DEBUG1,
(errmsg("I am %d accept fd %d", getpid(), front_end_fd)));
@@ -1993,7 +2001,7 @@ get_connection(int front_end_fd, SockAddr *saddr)
* pgpool <--> backend.
*/
static POOL_CONNECTION_POOL *
-get_backend_connection(POOL_CONNECTION * frontend)
+get_backend_connection(POOL_CONNECTION *frontend)
{
int found = 0;
StartupPacket *sp;
@@ -2049,8 +2057,8 @@ retry_startup:
* return if frontend was rejected; it simply terminates this process.
*/
MemoryContext frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext,
- "frontend_auth",
- ALLOCSET_DEFAULT_SIZES);
+ "frontend_auth",
+ ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(frontend_auth_cxt);
/*
@@ -2133,8 +2141,8 @@ retry_startup:
if (backend == NULL)
{
/*
- * Create a new connection to backend.
- * Authentication is performed if requested by backend.
+ * Create a new connection to backend. Authentication is performed if
+ * requested by backend.
*/
backend = connect_backend(sp, frontend);
}
@@ -2153,7 +2161,7 @@ static void
log_disconnections(char *database, char *username)
{
struct timeval endTime;
- long diff;
+ long diff;
long secs;
int msecs,
hours,
@@ -2161,7 +2169,7 @@ log_disconnections(char *database, char *username)
seconds;
gettimeofday(&endTime, NULL);
- diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec));
+ diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec));
msecs = (int) (diff % 1000000) / 1000;
secs = (long) (diff / 1000000);
@@ -2233,9 +2241,10 @@ pg_frontend_exists(void)
return 0;
}
-static int opt_sort(const void *a, const void *b)
+static int
+opt_sort(const void *a, const void *b)
{
- return strcmp( *(char **)a, *(char **)b);
+ return strcmp(*(char **) a, *(char **) b);
}
void
diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c
index 666187216..c3b369dc2 100644
--- a/src/protocol/pool_connection_pool.c
+++ b/src/protocol/pool_connection_pool.c
@@ -63,8 +63,8 @@ volatile sig_atomic_t backend_timer_expired = 0; /* flag for connection
* closed timer is expired */
volatile sig_atomic_t health_check_timer_expired; /* non 0 if health check
* timer expired */
-static POOL_CONNECTION_POOL_SLOT * create_cp(POOL_CONNECTION_POOL_SLOT * cp, int slot);
-static POOL_CONNECTION_POOL * new_connection(POOL_CONNECTION_POOL * p);
+static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot);
+static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p);
static int check_socket_status(int fd);
static bool connect_with_timeout(int fd, struct addrinfo *walk, char *host, int port, bool retry);
@@ -161,6 +161,7 @@ pool_get_cp(char *user, char *database, int protoMajor, int check_socket)
ereport(LOG,
(errmsg("connection closed."),
errdetail("retry to create new connection pool")));
+
/*
* It is possible that one of backend just broke. sleep 1
* second to wait for failover occurres, then wait for the
@@ -259,7 +260,7 @@ pool_create_cp(void)
POOL_CONNECTION_POOL *oldestp;
POOL_CONNECTION_POOL *ret;
ConnectionInfo *info;
- int main_node_id;
+ int main_node_id;
POOL_CONNECTION_POOL *p = pool_connection_pool;
@@ -297,7 +298,7 @@ pool_create_cp(void)
{
main_node_id = in_use_backend_id(p);
if (main_node_id < 0)
- elog(ERROR, "no in use backend found"); /* this should not happen */
+ elog(ERROR, "no in use backend found"); /* this should not happen */
ereport(DEBUG1,
(errmsg("creating connection pool"),
@@ -318,7 +319,7 @@ pool_create_cp(void)
p = oldestp;
main_node_id = in_use_backend_id(p);
if (main_node_id < 0)
- elog(ERROR, "no in use backend found"); /* this should not happen */
+ elog(ERROR, "no in use backend found"); /* this should not happen */
pool_send_frontend_exits(p);
ereport(DEBUG1,
@@ -358,7 +359,7 @@ pool_create_cp(void)
* set backend connection close timer
*/
void
-pool_connection_pool_timer(POOL_CONNECTION_POOL * backend)
+pool_connection_pool_timer(POOL_CONNECTION_POOL *backend)
{
POOL_CONNECTION_POOL *p = pool_connection_pool;
int i;
@@ -782,7 +783,7 @@ connect_inet_domain_socket_by_port(char *host, int port, bool retry)
struct addrinfo *res;
struct addrinfo *walk;
struct addrinfo hints;
- int retry_cnt = 5; /* getaddrinfo() retry count in case EAI_AGAIN */
+ int retry_cnt = 5; /* getaddrinfo() retry count in case EAI_AGAIN */
/*
* getaddrinfo() requires a string because it also accepts service names,
@@ -875,7 +876,8 @@ connect_inet_domain_socket_by_port(char *host, int port, bool retry)
/*
* create connection pool
*/
-static POOL_CONNECTION_POOL_SLOT * create_cp(POOL_CONNECTION_POOL_SLOT * cp, int slot)
+static POOL_CONNECTION_POOL_SLOT *
+create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot)
{
BackendInfo *b = &pool_config->backend_desc->backend_info[slot];
int fd;
@@ -902,13 +904,14 @@ static POOL_CONNECTION_POOL_SLOT * create_cp(POOL_CONNECTION_POOL_SLOT * cp, int
* Create actual connections to backends.
* New connection resides in TopMemoryContext.
*/
-static POOL_CONNECTION_POOL * new_connection(POOL_CONNECTION_POOL * p)
+static POOL_CONNECTION_POOL *
+new_connection(POOL_CONNECTION_POOL *p)
{
POOL_CONNECTION_POOL_SLOT *s;
int active_backend_count = 0;
int i;
bool status_changed = false;
- volatile BACKEND_STATUS status;
+ volatile BACKEND_STATUS status;
MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
@@ -1097,7 +1100,7 @@ close_all_backend_connections(void)
for (i = 0; i < pool_config->max_pool; i++, p++)
{
- int backend_id = in_use_backend_id(p);
+ int backend_id = in_use_backend_id(p);
if (backend_id < 0)
continue;
@@ -1120,9 +1123,10 @@ close_all_backend_connections(void)
void
update_pooled_connection_count(void)
{
- int i;
- int count = 0;
+ int i;
+ int count = 0;
POOL_CONNECTION_POOL *p = pool_connection_pool;
+
for (i = 0; i < pool_config->max_pool; i++, p++)
{
if (MAIN_CONNECTION(p))
@@ -1138,7 +1142,7 @@ update_pooled_connection_count(void)
int
in_use_backend_id(POOL_CONNECTION_POOL *pool)
{
- int i;
+ int i;
for (i = 0; i < NUM_BACKENDS; i++)
{
diff --git a/src/protocol/pool_pg_utils.c b/src/protocol/pool_pg_utils.c
index ccbe2b03c..2eebbbb3c 100644
--- a/src/protocol/pool_pg_utils.c
+++ b/src/protocol/pool_pg_utils.c
@@ -41,7 +41,7 @@
#include "pool_config_variables.h"
static int choose_db_node_id(char *str);
-static void free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp);
+static void free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT *cp);
static void si_enter_critical_region(void);
static void si_leave_critical_region(void);
@@ -64,9 +64,9 @@ make_persistent_db_connection(
{
int protoVersion;
char data[MAX_USER_AND_DATABASE];
- } StartupPacket_v3;
+ } StartupPacket_v3;
- static StartupPacket_v3 * startup_packet;
+ static StartupPacket_v3 *startup_packet;
int len,
len1;
@@ -200,8 +200,8 @@ make_persistent_db_connection_noerror(
* receives an ERROR, it stops processing and terminates, which is not
* good. This is problematic especially with pcp_node_info, since it
* calls db_node_role(), and db_node_role() calls this function. So if
- * the target PostgreSQL is down, EmitErrorReport() sends ERROR message
- * to pcp frontend and it stops (see process_pcp_response() in
+ * the target PostgreSQL is down, EmitErrorReport() sends ERROR
+ * message to pcp frontend and it stops (see process_pcp_response() in
* src/libs/pcp/pcp.c. To fix this, just eliminate calling
* EmitErrorReport(). This will suppress ERROR message but as you can
* see the comment in this function "does not ereports in case of an
@@ -221,7 +221,7 @@ make_persistent_db_connection_noerror(
* make_persistent_db_connection and discard_persistent_db_connection.
*/
static void
-free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp)
+free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT *cp)
{
if (!cp)
return;
@@ -245,7 +245,7 @@ free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp)
* make_persistent_db_connection().
*/
void
-discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp)
+discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT *cp)
{
int len;
@@ -274,7 +274,7 @@ discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp)
* send startup packet
*/
void
-send_startup_packet(POOL_CONNECTION_POOL_SLOT * cp)
+send_startup_packet(POOL_CONNECTION_POOL_SLOT *cp)
{
int len;
@@ -319,10 +319,10 @@ select_load_balancing_node(void)
int tmp;
int no_load_balance_node_id = -2;
uint64 lowest_delay;
- int lowest_delay_nodes[NUM_BACKENDS];
+ int lowest_delay_nodes[NUM_BACKENDS];
/* prng state data for load balancing */
- static pg_prng_state backsel_state;
+ static pg_prng_state backsel_state;
/*
* -2 indicates there's no database_redirect_preference_list. -1 indicates
@@ -443,24 +443,27 @@ select_load_balancing_node(void)
if (suggested_node_id >= 0)
{
/*
- * If pgpool is running in Streaming Replication mode and delay_threshold
- * and prefer_lower_delay_standby are true, we choose the least delayed
- * node if suggested_node is standby and delayed over delay_threshold.
+ * If pgpool is running in Streaming Replication mode and
+ * delay_threshold and prefer_lower_delay_standby are true, we choose
+ * the least delayed node if suggested_node is standby and delayed
+ * over delay_threshold.
*/
if (STREAM && pool_config->prefer_lower_delay_standby &&
suggested_node_id != PRIMARY_NODE_ID &&
check_replication_delay(suggested_node_id) < 0)
{
ereport(DEBUG1,
- (errmsg("selecting load balance node"),
- errdetail("suggested backend %d is streaming delayed over delay_threshold", suggested_node_id)));
+ (errmsg("selecting load balance node"),
+ errdetail("suggested backend %d is streaming delayed over delay_threshold", suggested_node_id)));
/*
- * The new load balancing node is selected from the
- * nodes which have the lowest delay.
+ * The new load balancing node is selected from the nodes which
+ * have the lowest delay.
*/
if (pool_config->delay_threshold_by_time > 0)
- lowest_delay = pool_config->delay_threshold_by_time * 1000; /* convert from milli seconds to micro seconds */
+ lowest_delay = pool_config->delay_threshold_by_time * 1000; /* convert from milli
+ * seconds to micro
+ * seconds */
else
lowest_delay = pool_config->delay_threshold;
@@ -484,7 +487,8 @@ select_load_balancing_node(void)
}
else if (lowest_delay > BACKEND_INFO(i).standby_delay)
{
- int ii;
+ int ii;
+
lowest_delay = BACKEND_INFO(i).standby_delay;
for (ii = 0; ii < NUM_BACKENDS; ii++)
{
@@ -628,7 +632,8 @@ select_load_balancing_node(void)
}
else if (lowest_delay > BACKEND_INFO(i).standby_delay)
{
- int ii;
+ int ii;
+
lowest_delay = BACKEND_INFO(i).standby_delay;
for (ii = 0; ii < NUM_BACKENDS; ii++)
{
@@ -679,13 +684,13 @@ static void
initialize_prng(pg_prng_state *state)
{
static bool prng_seed_set = false;
- uint64 seed;
+ uint64 seed;
if (unlikely(!prng_seed_set))
{
/* initialize prng */
if (!pg_strong_random(&seed, sizeof(seed)))
- seed = UINT64CONST(1); /* Pick a value, as long as it spreads */
+ seed = UINT64CONST(1); /* Pick a value, as long as it spreads */
pg_prng_seed(state, seed);
prng_seed_set = true;
}
@@ -702,17 +707,17 @@ initialize_prng(pg_prng_state *state)
*
*/
PGVersion *
-Pgversion(POOL_CONNECTION_POOL * backend)
+Pgversion(POOL_CONNECTION_POOL *backend)
{
#define VERSION_BUF_SIZE 10
- static PGVersion pgversion;
- static POOL_RELCACHE *relcache;
- char *result;
- char *p;
- char buf[VERSION_BUF_SIZE];
- int i;
- int major;
- int minor;
+ static PGVersion pgversion;
+ static POOL_RELCACHE *relcache;
+ char *result;
+ char *p;
+ char buf[VERSION_BUF_SIZE];
+ int i;
+ int major;
+ int minor;
/*
* First, check local cache. If cache is set, just return it.
@@ -743,7 +748,7 @@ Pgversion(POOL_CONNECTION_POOL * backend)
/*
* Search relcache.
*/
- result = (char *)pool_search_relcache(relcache, backend, "version");
+ result = (char *) pool_search_relcache(relcache, backend, "version");
if (result == 0)
{
ereport(FATAL,
@@ -801,7 +806,7 @@ Pgversion(POOL_CONNECTION_POOL * backend)
{
p++;
i = 0;
- while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ')
+ while (i < VERSION_BUF_SIZE - 1 && p && *p != '.' && *p != ' ')
{
buf[i++] = *p++;
}
@@ -817,7 +822,7 @@ Pgversion(POOL_CONNECTION_POOL * backend)
*/
p++;
i = 0;
- while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ')
+ while (i < VERSION_BUF_SIZE - 1 && p && *p != '.' && *p != ' ')
{
buf[i++] = *p++;
}
@@ -879,6 +884,7 @@ choose_db_node_id(char *str)
}
return node_id;
}
+
/*
*---------------------------------------------------------------------------------
* Snapshot Isolation modules
@@ -1003,7 +1009,7 @@ void
si_snapshot_acquired(void)
{
POOL_SESSION_CONTEXT *session;
- int i;
+ int i;
session = pool_get_session_context(true);
@@ -1018,9 +1024,10 @@ si_snapshot_acquired(void)
if (si_manage_info->snapshot_counter == 0)
{
/* wakeup all waiting children */
- for (i = 0; i < pool_config->num_init_children ; i++)
+ for (i = 0; i < pool_config->num_init_children; i++)
{
- pid_t pid = si_manage_info->snapshot_waiting_children[i];
+ pid_t pid = si_manage_info->snapshot_waiting_children[i];
+
if (pid > 0)
{
elog(SI_DEBUG_LOG_LEVEL, "si_snapshot_acquired: send SIGUSR2 to %d", pid);
@@ -1076,7 +1083,7 @@ void
si_commit_done(void)
{
POOL_SESSION_CONTEXT *session;
- int i;
+ int i;
session = pool_get_session_context(true);
@@ -1092,9 +1099,10 @@ si_commit_done(void)
if (si_manage_info->commit_counter == 0)
{
/* wakeup all waiting children */
- for (i = 0; i < pool_config->num_init_children ; i++)
+ for (i = 0; i < pool_config->num_init_children; i++)
{
- pid_t pid = si_manage_info->commit_waiting_children[i];
+ pid_t pid = si_manage_info->commit_waiting_children[i];
+
if (pid > 0)
{
elog(SI_DEBUG_LOG_LEVEL, "si_commit_done: send SIGUSR2 to %d", pid);
@@ -1117,7 +1125,8 @@ si_commit_done(void)
* -1: delay exceeds delay_threshold_by_time
* -2: delay exceeds delay_threshold
*/
-int check_replication_delay(int node_id)
+int
+check_replication_delay(int node_id)
{
BackendInfo *bkinfo;
@@ -1132,7 +1141,7 @@ int check_replication_delay(int node_id)
* to multiply delay_threshold_by_time by 1000 to normalize.
*/
if (pool_config->delay_threshold_by_time > 0 &&
- bkinfo->standby_delay > pool_config->delay_threshold_by_time*1000)
+ bkinfo->standby_delay > pool_config->delay_threshold_by_time * 1000)
return -1;
/*
@@ -1144,4 +1153,3 @@ int check_replication_delay(int node_id)
return 0;
}
-
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index 5a6b97ba1..b69cb3d52 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -80,28 +80,28 @@
#define IDLE_IN_TRANSACTION_SESSION_TIMEOUT_ERROR_CODE "25P03"
#define IDLE_SESSION_TIMEOUT_ERROR_CODE "57P05"
-static int reset_backend(POOL_CONNECTION_POOL * backend, int qcnt);
+static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
static char *get_insert_command_table_name(InsertStmt *node);
-static bool is_cache_empty(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
+static bool is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
static bool is_panic_or_fatal_error(char *message, int major);
-static int extract_message(POOL_CONNECTION * backend, char *error_code, int major, char class, bool unread);
-static int detect_postmaster_down_error(POOL_CONNECTION * backend, int major);
+static int extract_message(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread);
+static int detect_postmaster_down_error(POOL_CONNECTION *backend, int major);
static bool is_internal_transaction_needed(Node *node);
static bool pool_has_insert_lock(void);
-static POOL_STATUS add_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table);
-static bool has_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table, bool for_update);
-static POOL_STATUS insert_oid_into_insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table);
-static POOL_STATUS read_packets_and_process(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int reset_request, int *state, short *num_fields, bool *cont);
+static POOL_STATUS add_lock_target(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table);
+static bool has_lock_target(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table, bool for_update);
+static POOL_STATUS insert_oid_into_insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table);
+static POOL_STATUS read_packets_and_process(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int reset_request, int *state, short *num_fields, bool *cont);
static bool is_all_standbys_command_complete(unsigned char *kind_list, int num_backends, int main_node);
-static bool pool_process_notice_message_from_one_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int backend_idx, char kind);
+static bool pool_process_notice_message_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int backend_idx, char kind);
/*
* Main module for query processing
* reset_request: if non 0, call reset_backend to execute reset queries
*/
POOL_STATUS
-pool_process_query(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+pool_process_query(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
int reset_request)
{
short num_fields = 0; /* the number of fields in a row (V2 protocol) */
@@ -135,8 +135,8 @@ pool_process_query(POOL_CONNECTION * frontend,
}
/*
- * Reset error flag while processing reset queries.
- * The flag is set to on inside pool_send_and_wait().
+ * Reset error flag while processing reset queries. The flag is set to on
+ * inside pool_send_and_wait().
*/
reset_query_error = false;
@@ -306,8 +306,7 @@ pool_process_query(POOL_CONNECTION * frontend,
else
{
/*
- * If we have pending data in main, we need to process
- * it
+ * If we have pending data in main, we need to process it
*/
if (pool_ssl_pending(MAIN(backend)) ||
!pool_read_buffer_is_empty(MAIN(backend)))
@@ -327,8 +326,8 @@ pool_process_query(POOL_CONNECTION * frontend,
!pool_read_buffer_is_empty(CONNECTION(backend, i)))
{
/*
- * If we have pending data in main, we need
- * to process it
+ * If we have pending data in main, we need to
+ * process it
*/
if (IS_MAIN_NODE_ID(i))
{
@@ -344,9 +343,8 @@ pool_process_query(POOL_CONNECTION * frontend,
char *string;
/*
- * If main does not have pending data,
- * we discard one packet from other
- * backend
+ * If main does not have pending data, we
+ * discard one packet from other backend
*/
pool_read_with_error(CONNECTION(backend, i), &kind, sizeof(kind),
"reading message kind from backend");
@@ -358,22 +356,22 @@ pool_process_query(POOL_CONNECTION * frontend,
int sendlen;
/*
- * In native replication mode we may
- * send the query to the standby
- * node and the NOTIFY comes back
- * only from primary node. But
- * since we have sent the query to
- * the standby, so the current
- * MAIN_NODE_ID will be pointing
- * to the standby node. And we
- * will get stuck if we keep
- * waiting for the current main
- * node (standby) in this case to
- * send us the NOTIFY message. see
- * "0000116: LISTEN Notifications
- * Not Reliably Delivered Using
- * JDBC4 Demonstrator" for the
- * scenario
+ * In native replication mode we
+ * may send the query to the
+ * standby node and the NOTIFY
+ * comes back only from primary
+ * node. But since we have sent
+ * the query to the standby, so
+ * the current MAIN_NODE_ID will
+ * be pointing to the standby
+ * node. And we will get stuck if
+ * we keep waiting for the current
+ * main node (standby) in this
+ * case to send us the NOTIFY
+ * message. see "0000116: LISTEN
+ * Notifications Not Reliably
+ * Delivered Using JDBC4
+ * Demonstrator" for the scenario
*/
pool_read_with_error(CONNECTION(backend, i), &len, sizeof(len),
"reading message length from backend");
@@ -396,11 +394,11 @@ pool_process_query(POOL_CONNECTION * frontend,
* sent to all backends. However
* the order of arrival of
* 'Notification response' is not
- * necessarily the main first
- * and then standbys. So if it
- * arrives standby first, we should
- * try to read from main, rather
- * than just discard it.
+ * necessarily the main first and
+ * then standbys. So if it arrives
+ * standby first, we should try to
+ * read from main, rather than
+ * just discard it.
*/
pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
ereport(LOG,
@@ -473,6 +471,7 @@ pool_process_query(POOL_CONNECTION * frontend,
if (pool_config->memory_cache_enabled)
{
volatile bool invalidate_request = Req_info->query_cache_invalidate_request;
+
if (invalidate_request)
{
/*
@@ -489,7 +488,7 @@ pool_process_query(POOL_CONNECTION * frontend,
* send simple query message to a node.
*/
void
-send_simplequery_message(POOL_CONNECTION * backend, int len, char *string, int major)
+send_simplequery_message(POOL_CONNECTION *backend, int len, char *string, int major)
{
/* forward the query to the backend */
pool_write(backend, "Q", 1);
@@ -512,7 +511,7 @@ send_simplequery_message(POOL_CONNECTION * backend, int len, char *string, int m
*/
void
-wait_for_query_response_with_trans_cleanup(POOL_CONNECTION * frontend, POOL_CONNECTION * backend,
+wait_for_query_response_with_trans_cleanup(POOL_CONNECTION *frontend, POOL_CONNECTION *backend,
int protoVersion, int pid, char *key, int keylen)
{
PG_TRY();
@@ -545,7 +544,7 @@ wait_for_query_response_with_trans_cleanup(POOL_CONNECTION * frontend, POOL_CONN
* response.
*/
POOL_STATUS
-wait_for_query_response(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, int protoVersion)
+wait_for_query_response(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, int protoVersion)
{
#define DUMMY_PARAMETER "pgpool_dummy_param"
#define DUMMY_VALUE "pgpool_dummy_value"
@@ -632,7 +631,7 @@ wait_for_query_response(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, i
* Extended query protocol has to send Flush message.
*/
POOL_STATUS
-send_extended_protocol_message(POOL_CONNECTION_POOL * backend,
+send_extended_protocol_message(POOL_CONNECTION_POOL *backend,
int node_id, char *kind,
int len, char *string)
{
@@ -665,7 +664,7 @@ send_extended_protocol_message(POOL_CONNECTION_POOL * backend,
* wait until read data is ready
*/
int
-synchronize(POOL_CONNECTION * cp)
+synchronize(POOL_CONNECTION *cp)
{
return pool_check_fd(cp);
}
@@ -678,7 +677,7 @@ synchronize(POOL_CONNECTION * cp)
* valid backends might be changed by failover/failback.
*/
void
-pool_send_frontend_exits(POOL_CONNECTION_POOL * backend)
+pool_send_frontend_exits(POOL_CONNECTION_POOL *backend)
{
int len;
int i;
@@ -720,8 +719,8 @@ pool_send_frontend_exits(POOL_CONNECTION_POOL * backend)
*/
POOL_STATUS
-SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
int len,
len1 = 0;
@@ -786,13 +785,13 @@ SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend,
/*
* Optimization for other than "Command Complete", "Ready For query",
- * "Error response" ,"Notice message", "Notification response",
- * "Row description", "No data" and "Close Complete".
- * messages. Especially, since it is too often to receive and forward
- * "Data Row" message, we do not flush the message to frontend now. We
- * expect that "Command Complete" message (or "Error response" or "Notice
- * response" message) follows the stream of data row message anyway, so
- * flushing will be done at that time.
+ * "Error response" ,"Notice message", "Notification response", "Row
+ * description", "No data" and "Close Complete". messages. Especially,
+ * since it is too often to receive and forward "Data Row" message, we do
+ * not flush the message to frontend now. We expect that "Command
+ * Complete" message (or "Error response" or "Notice response" message)
+ * follows the stream of data row message anyway, so flushing will be done
+ * at that time.
*
* Same thing can be said to CopyData message. Tremendous number of
* CopyData messages are sent to frontend (typical use case is pg_dump).
@@ -851,8 +850,8 @@ SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend,
}
POOL_STATUS
-SimpleForwardToBackend(char kind, POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
int sendlen;
@@ -912,7 +911,7 @@ SimpleForwardToBackend(char kind, POOL_CONNECTION * frontend,
* Handle parameter status message
*/
POOL_STATUS
-ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
int len,
len1 = 0;
@@ -922,7 +921,8 @@ ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
char *name;
char *value;
POOL_STATUS status;
- char *parambuf = NULL; /* pointer to parameter + value string buffer */
+ char *parambuf = NULL; /* pointer to parameter + value string
+ * buffer */
int i;
pool_write(frontend, "S", 1);
@@ -959,15 +959,16 @@ ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
if (IS_MAIN_NODE_ID(i))
{
- int pos;
+ int pos;
len1 = len;
+
/*
* To suppress Coverity false positive warning. Actually
* being IS_MAIN_NODE_ID(i)) true only happens in a loop. So
* we don't need to worry about to leak memory previously
- * allocated in parambuf. But Coverity is not smart enough
- * to realize it.
+ * allocated in parambuf. But Coverity is not smart enough to
+ * realize it.
*/
if (parambuf)
pfree(parambuf);
@@ -984,7 +985,8 @@ ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
else
{
/*
- * Except "in_hot_standby" parameter, complain the message length difference.
+ * Except "in_hot_standby" parameter, complain the message
+ * length difference.
*/
if (strcmp(name, "in_hot_standby"))
{
@@ -1037,7 +1039,7 @@ reset_connection(void)
* 0: no query was issued 1: a query was issued 2: no more queries remain -1: error
*/
static int
-reset_backend(POOL_CONNECTION_POOL * backend, int qcnt)
+reset_backend(POOL_CONNECTION_POOL *backend, int qcnt)
{
char *query;
int qn;
@@ -1129,7 +1131,7 @@ reset_backend(POOL_CONNECTION_POOL * backend, int qcnt)
bool
is_select_query(Node *node, char *sql)
{
- bool prepare = false;
+ bool prepare = false;
if (node == NULL)
return false;
@@ -1154,6 +1156,7 @@ is_select_query(Node *node, char *sql)
if (IsA(node, PrepareStmt))
{
PrepareStmt *prepare_statement = (PrepareStmt *) node;
+
prepare = true;
node = (Node *) (prepare_statement->query);
}
@@ -1183,15 +1186,15 @@ is_select_query(Node *node, char *sql)
}
/*
- * If SQL comment is not allowed, the query must start with certain characters.
- * However if it's PREPARE, we should skip the check.
+ * If SQL comment is not allowed, the query must start with certain
+ * characters. However if it's PREPARE, we should skip the check.
*/
if (!pool_config->allow_sql_comments)
/* '\0' and ';' signify empty query */
return (*sql == 's' || *sql == 'S' || *sql == '(' ||
*sql == 'w' || *sql == 'W' || *sql == 't' || *sql == 'T' ||
*sql == '\0' || *sql == ';' ||
- prepare);
+ prepare);
else
return true;
}
@@ -1325,7 +1328,7 @@ is_rollback_to_query(Node *node)
* send error message to frontend
*/
void
-pool_send_error_message(POOL_CONNECTION * frontend, int protoMajor,
+pool_send_error_message(POOL_CONNECTION *frontend, int protoMajor,
char *code,
char *message,
char *detail,
@@ -1340,7 +1343,7 @@ pool_send_error_message(POOL_CONNECTION * frontend, int protoMajor,
* send fatal message to frontend
*/
void
-pool_send_fatal_message(POOL_CONNECTION * frontend, int protoMajor,
+pool_send_fatal_message(POOL_CONNECTION *frontend, int protoMajor,
char *code,
char *message,
char *detail,
@@ -1355,7 +1358,7 @@ pool_send_fatal_message(POOL_CONNECTION * frontend, int protoMajor,
* send severity message to frontend
*/
void
-pool_send_severity_message(POOL_CONNECTION * frontend, int protoMajor,
+pool_send_severity_message(POOL_CONNECTION *frontend, int protoMajor,
char *code,
char *message,
char *detail,
@@ -1460,7 +1463,7 @@ pool_send_severity_message(POOL_CONNECTION * frontend, int protoMajor,
}
void
-pool_send_readyforquery(POOL_CONNECTION * frontend)
+pool_send_readyforquery(POOL_CONNECTION *frontend)
{
int len;
@@ -1481,7 +1484,7 @@ pool_send_readyforquery(POOL_CONNECTION * frontend)
* length for ReadyForQuery. This mode is necessary when called from ReadyForQuery().
*/
POOL_STATUS
-do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend,
+do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend,
char *query, int protoMajor, int pid, char *key, int keylen, int no_ready_for_query)
{
int len;
@@ -1496,8 +1499,8 @@ do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend,
send_simplequery_message(backend, strlen(query) + 1, query, protoMajor);
/*
- * Wait for response from backend while polling frontend connection is
- * ok. If not, cancel the transaction.
+ * Wait for response from backend while polling frontend connection is ok.
+ * If not, cancel the transaction.
*/
wait_for_query_response_with_trans_cleanup(frontend,
backend,
@@ -1696,7 +1699,7 @@ retry_read_packet:
* If SELECT is error, we must abort transaction on other nodes.
*/
void
-do_error_command(POOL_CONNECTION * backend, int major)
+do_error_command(POOL_CONNECTION *backend, int major)
{
char *error_query = POOL_ERROR_QUERY;
int len;
@@ -1753,7 +1756,7 @@ do_error_command(POOL_CONNECTION * backend, int major)
* than main node to ket them go into abort status.
*/
void
-do_error_execute_command(POOL_CONNECTION_POOL * backend, int node_id, int major)
+do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major)
{
char kind;
char *string;
@@ -1852,7 +1855,7 @@ do_error_execute_command(POOL_CONNECTION_POOL * backend, int node_id, int major)
* Free POOL_SELECT_RESULT object
*/
void
-free_select_result(POOL_SELECT_RESULT * result)
+free_select_result(POOL_SELECT_RESULT *result)
{
int i,
j;
@@ -1905,7 +1908,7 @@ free_select_result(POOL_SELECT_RESULT * result)
* to void. and now ereport is thrown in case of error occurred within the function
*/
void
-do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, int major)
+do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result, int major)
{
#define DO_QUERY_ALLOC_NUM 1024 /* memory allocation unit for
* POOL_SELECT_RESULT */
@@ -1970,10 +1973,9 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i
/*
* Send a query to the backend. We use extended query protocol with named
- * statement/portal if we are processing extended query since simple
- * query breaks unnamed statements/portals. The name of named
- * statement/unnamed statement are "pgpool_PID" where PID is the process id
- * of itself.
+ * statement/portal if we are processing extended query since simple query
+ * breaks unnamed statements/portals. The name of named statement/unnamed
+ * statement are "pgpool_PID" where PID is the process id of itself.
*/
if (pool_get_session_context(true) && pool_is_doing_extended_query_message())
{
@@ -2140,7 +2142,8 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i
if (pool_extract_error_message(false, backend, major, true, &message) == 1)
{
- int etype;
+ int etype;
+
/*
* This is fatal. Because: If we operate extended query,
* backend would not accept subsequent commands until "sync"
@@ -2150,9 +2153,8 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i
* transaction is aborted, and subsequent query would not
* accepted. In summary there's no transparent way for
* frontend to handle error case. The only way is closing this
- * session.
- * However if the process type is main process, we should not
- * exit the process.
+ * session. However if the process type is main process, we
+ * should not exit the process.
*/
if (processType == PT_WORKER)
{
@@ -2499,7 +2501,7 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i
* 3: row lock against insert_lock table is required
*/
int
-need_insert_lock(POOL_CONNECTION_POOL * backend, char *query, Node *node)
+need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node)
{
/*
* Query to know if the target table has SERIAL column or not.
@@ -2519,7 +2521,7 @@ need_insert_lock(POOL_CONNECTION_POOL * backend, char *query, Node *node)
char *table;
int result;
- static POOL_RELCACHE * relcache;
+ static POOL_RELCACHE *relcache;
/* INSERT statement? */
if (!IsA(node, InsertStmt))
@@ -2619,7 +2621,7 @@ need_insert_lock(POOL_CONNECTION_POOL * backend, char *query, Node *node)
* [ADMIN] 'SGT DETAIL: Could not open file "pg_clog/05DC": ...
*/
POOL_STATUS
-insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *query, InsertStmt *node, int lock_kind)
+insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node, int lock_kind)
{
char *table;
int len = 0;
@@ -2631,7 +2633,7 @@ insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *qu
regex_t preg;
size_t nmatch = 2;
regmatch_t pmatch[nmatch];
- static POOL_RELCACHE * relcache;
+ static POOL_RELCACHE *relcache;
POOL_SELECT_RESULT *result;
POOL_STATUS status = POOL_CONTINUE;
@@ -2827,7 +2829,7 @@ insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *qu
else
{
status = do_command(frontend, MAIN(backend), qbuf, MAJOR(backend), MAIN_CONNECTION(backend)->pid,
- MAIN_CONNECTION(backend)->key, MAIN_CONNECTION(backend)->keylen ,0);
+ MAIN_CONNECTION(backend)->key, MAIN_CONNECTION(backend)->keylen, 0);
}
}
}
@@ -2895,7 +2897,7 @@ pool_has_insert_lock(void)
*/
bool result;
- static POOL_RELCACHE * relcache;
+ static POOL_RELCACHE *relcache;
POOL_CONNECTION_POOL *backend;
backend = pool_get_session_context(false)->backend;
@@ -2923,7 +2925,8 @@ pool_has_insert_lock(void)
* Return POOL_CONTINUE if the row is inserted successfully
* or the row already exists, the others return POOL_ERROR.
*/
-static POOL_STATUS add_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table)
+static POOL_STATUS
+add_lock_target(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table)
{
/*
* lock the row where reloid is 0 to avoid "duplicate key violates..."
@@ -2984,8 +2987,8 @@ static POOL_STATUS add_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_P
* If lock is true, this function locks the row of the table oid.
*/
static bool
-has_lock_target(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+has_lock_target(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
char *table, bool lock)
{
char *suffix;
@@ -3025,9 +3028,10 @@ has_lock_target(POOL_CONNECTION * frontend,
/*
* Insert the oid of the specified table into insert_lock table.
*/
-static POOL_STATUS insert_oid_into_insert_lock(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- char *table)
+static POOL_STATUS
+insert_oid_into_insert_lock(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ char *table)
{
char qbuf[QUERY_STRING_BUFFER_LEN];
POOL_STATUS status;
@@ -3093,7 +3097,7 @@ is_drop_database(Node *node)
* check if any pending data remains in backend.
*/
bool
-is_backend_cache_empty(POOL_CONNECTION_POOL * backend)
+is_backend_cache_empty(POOL_CONNECTION_POOL *backend)
{
int i;
@@ -3120,14 +3124,14 @@ is_backend_cache_empty(POOL_CONNECTION_POOL * backend)
* check if any pending data remains.
*/
static bool
-is_cache_empty(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
/* Are we suspending reading from frontend? */
if (!pool_is_suspend_reading_from_frontend())
{
/*
- * If SSL is enabled, we need to check SSL internal buffer is empty or not
- * first.
+ * If SSL is enabled, we need to check SSL internal buffer is empty or
+ * not first.
*/
if (pool_ssl_pending(frontend))
return false;
@@ -3228,7 +3232,7 @@ check_copy_from_stdin(Node *node)
* read kind from one backend
*/
void
-read_kind_from_one_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *kind, int node)
+read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node)
{
if (VALID_BACKEND(node))
{
@@ -3275,7 +3279,7 @@ is_all_standbys_command_complete(unsigned char *kind_list, int num_backends, int
* this function uses "decide by majority" method if kinds from all backends do not agree.
*/
void
-read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *decided_kind)
+read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *decided_kind)
{
int i;
unsigned char kind_list[MAX_NUM_BACKENDS]; /* records each backend's kind */
@@ -3378,9 +3382,9 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
read_kind_from_one_backend(frontend, backend, (char *) &kind, MAIN_NODE_ID);
/*
- * If we received a notification message in native replication mode, other
- * backends will not receive the message. So we should skip other
- * nodes otherwise we will hang in pool_read.
+ * If we received a notification message in native replication mode,
+ * other backends will not receive the message. So we should skip
+ * other nodes otherwise we will hang in pool_read.
*/
if (kind == 'A')
{
@@ -3454,7 +3458,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
*/
else if (kind == 'S')
{
- int len2;
+ int len2;
pool_read(CONNECTION(backend, i), &len, sizeof(len));
len2 = len;
@@ -3469,7 +3473,8 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
if (IS_MAIN_NODE_ID(i))
{
- int pos;
+ int pos;
+
pool_add_param(&CONNECTION(backend, i)->params, p, value);
if (!strcmp("application_name", p))
@@ -3525,16 +3530,16 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
*/
for (i = 0; i < NUM_BACKENDS; i++)
{
- int unread_len;
- char *unread_p;
- char *p;
- int len;
+ int unread_len;
+ char *unread_p;
+ char *p;
+ int len;
if (VALID_BACKEND(i))
{
if (kind_list[i] == 'E')
{
- int major = MAJOR(CONNECTION(backend, i));
+ int major = MAJOR(CONNECTION(backend, i));
if (major == PROTO_MAJOR_V3)
{
@@ -3618,8 +3623,8 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
* cases it is possible that similar issue could happen since returned
* messages do not follow the sequence recorded in the pending
* messages because the backend ignores requests till sync message is
- * received. In this case we need to re-sync either primary or standby.
- * So we check not only the standby but primary node.
+ * received. In this case we need to re-sync either primary or
+ * standby. So we check not only the standby but primary node.
*/
if (session_context->load_balance_node_id != MAIN_NODE_ID &&
(kind_list[MAIN_NODE_ID] == 'Z' ||
@@ -3697,8 +3702,8 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
/*
* In main/replica mode, if primary gets an error at commit, while
- * other standbys are normal at commit, we don't need to degenerate any
- * backend because it is likely that the error was caused by a
+ * other standbys are normal at commit, we don't need to degenerate
+ * any backend because it is likely that the error was caused by a
* deferred trigger.
*/
else if (MAIN_REPLICA && query_context->parse_tree &&
@@ -3712,7 +3717,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
errdetail("do not degenerate because it is likely caused by a delayed commit")));
if (SL_MODE && pool_is_doing_extended_query_message() && msg)
- pool_pending_message_free_pending_message(msg);
+ pool_pending_message_free_pending_message(msg);
return;
}
else if (max_count <= NUM_BACKENDS / 2.0)
@@ -3755,7 +3760,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
if (degenerate_node_num)
{
int retcode = 2;
- StringInfoData msg;
+ StringInfoData msg;
initStringInfo(&msg);
appendStringInfoString(&msg, "kind mismatch among backends. ");
@@ -3948,7 +3953,7 @@ parse_copy_data(char *buf, int len, char delimiter, int col_id)
}
void
-query_ps_status(char *query, POOL_CONNECTION_POOL * backend)
+query_ps_status(char *query, POOL_CONNECTION_POOL *backend)
{
StartupPacket *sp;
char psbuf[1024];
@@ -4128,7 +4133,7 @@ is_internal_transaction_needed(Node *node)
* Start an internal transaction if necessary.
*/
POOL_STATUS
-start_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node)
+start_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node)
{
int i;
@@ -4172,7 +4177,7 @@ start_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * ba
* that satisfy VALID_BACKEND macro.
*/
POOL_STATUS
-end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+end_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
int i;
int len;
@@ -4253,10 +4258,9 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back
if (MAJOR(backend) == PROTO_MAJOR_V3 && VALID_BACKEND(MAIN_NODE_ID))
{
/*
- * Skip rest of Ready for Query packet for backends
- * satisfying VALID_BACKEND macro because they should have
- * been already received the data, which is not good for
- * do_command().
+ * Skip rest of Ready for Query packet for backends satisfying
+ * VALID_BACKEND macro because they should have been already
+ * received the data, which is not good for do_command().
*/
pool_read(CONNECTION(backend, MAIN_NODE_ID), &len, sizeof(len));
pool_read(CONNECTION(backend, MAIN_NODE_ID), &tstate, sizeof(tstate));
@@ -4291,8 +4295,8 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back
if (MAJOR(backend) == PROTO_MAJOR_V3 && !VALID_BACKEND(MAIN_NODE_ID))
{
/*
- * Skip rest of Ready for Query packet for the backend
- * that does not satisfy VALID_BACKEND.
+ * Skip rest of Ready for Query packet for the backend that
+ * does not satisfy VALID_BACKEND.
*/
pool_read(CONNECTION(backend, MAIN_NODE_ID), &len, sizeof(len));
pool_read(CONNECTION(backend, MAIN_NODE_ID), &tstate, sizeof(tstate));
@@ -4317,7 +4321,7 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back
static bool
is_panic_or_fatal_error(char *message, int major)
{
- char *str;
+ char *str;
str = extract_error_kind(message, major);
@@ -4335,7 +4339,7 @@ is_panic_or_fatal_error(char *message, int major)
char *
extract_error_kind(char *message, int major)
{
- char *ret = "unknown";
+ char *ret = "unknown";
if (major == PROTO_MAJOR_V3)
{
@@ -4370,7 +4374,7 @@ extract_error_kind(char *message, int major)
}
static int
-detect_postmaster_down_error(POOL_CONNECTION * backend, int major)
+detect_postmaster_down_error(POOL_CONNECTION *backend, int major)
{
int r = extract_message(backend, ADMIN_SHUTDOWN_ERROR_CODE, major, 'E', false);
@@ -4410,7 +4414,7 @@ detect_postmaster_down_error(POOL_CONNECTION * backend, int major)
}
int
-detect_active_sql_transaction_error(POOL_CONNECTION * backend, int major)
+detect_active_sql_transaction_error(POOL_CONNECTION *backend, int major)
{
int r = extract_message(backend, ACTIVE_SQL_TRANSACTION_ERROR_CODE, major, 'E', true);
@@ -4424,7 +4428,7 @@ detect_active_sql_transaction_error(POOL_CONNECTION * backend, int major)
}
int
-detect_deadlock_error(POOL_CONNECTION * backend, int major)
+detect_deadlock_error(POOL_CONNECTION *backend, int major)
{
int r = extract_message(backend, DEADLOCK_ERROR_CODE, major, 'E', true);
@@ -4436,7 +4440,7 @@ detect_deadlock_error(POOL_CONNECTION * backend, int major)
}
int
-detect_serialization_error(POOL_CONNECTION * backend, int major, bool unread)
+detect_serialization_error(POOL_CONNECTION *backend, int major, bool unread)
{
int r = extract_message(backend, SERIALIZATION_FAIL_ERROR_CODE, major, 'E', unread);
@@ -4448,7 +4452,7 @@ detect_serialization_error(POOL_CONNECTION * backend, int major, bool unread)
}
int
-detect_query_cancel_error(POOL_CONNECTION * backend, int major)
+detect_query_cancel_error(POOL_CONNECTION *backend, int major)
{
int r = extract_message(backend, QUERY_CANCEL_ERROR_CODE, major, 'E', true);
@@ -4461,7 +4465,7 @@ detect_query_cancel_error(POOL_CONNECTION * backend, int major)
int
-detect_idle_in_transaction_session_timeout_error(POOL_CONNECTION * backend, int major)
+detect_idle_in_transaction_session_timeout_error(POOL_CONNECTION *backend, int major)
{
int r = extract_message(backend, IDLE_IN_TRANSACTION_SESSION_TIMEOUT_ERROR_CODE, major, 'E', true);
@@ -4473,7 +4477,7 @@ detect_idle_in_transaction_session_timeout_error(POOL_CONNECTION * backend, int
}
int
-detect_idle_session_timeout_error(POOL_CONNECTION * backend, int major)
+detect_idle_session_timeout_error(POOL_CONNECTION *backend, int major)
{
int r = extract_message(backend, IDLE_SESSION_TIMEOUT_ERROR_CODE, major, 'E', true);
@@ -4490,13 +4494,13 @@ detect_idle_session_timeout_error(POOL_CONNECTION * backend, int major)
* throw an ereport for all other errors.
*/
static int
-extract_message(POOL_CONNECTION * backend, char *error_code, int major, char class, bool unread)
+extract_message(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread)
{
int is_error = 0;
char kind;
int len;
int nlen = 0;
- char *str = NULL;
+ char *str = NULL;
if (pool_read(backend, &kind, sizeof(kind)))
return -1;
@@ -4567,7 +4571,7 @@ extract_message(POOL_CONNECTION * backend, char *error_code, int major, char cla
*/
static bool
-pool_process_notice_message_from_one_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int backend_idx, char kind)
+pool_process_notice_message_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int backend_idx, char kind)
{
int major = MAJOR(backend);
POOL_CONNECTION *backend_conn = CONNECTION(backend, backend_idx);
@@ -4659,7 +4663,7 @@ pool_process_notice_message_from_one_backend(POOL_CONNECTION * frontend, POOL_CO
* -1: error
*/
int
-pool_extract_error_message(bool read_kind, POOL_CONNECTION * backend, int major, bool unread, char **message)
+pool_extract_error_message(bool read_kind, POOL_CONNECTION *backend, int major, bool unread, char **message)
{
char kind;
int ret = 1;
@@ -4764,7 +4768,7 @@ pool_extract_error_message(bool read_kind, POOL_CONNECTION * backend, int major,
* read message kind and rest of the packet then discard it
*/
POOL_STATUS
-pool_discard_packet(POOL_CONNECTION_POOL * cp)
+pool_discard_packet(POOL_CONNECTION_POOL *cp)
{
int i;
char kind;
@@ -4792,7 +4796,7 @@ pool_discard_packet(POOL_CONNECTION_POOL * cp)
* read message length and rest of the packet then discard it
*/
POOL_STATUS
-pool_discard_packet_contents(POOL_CONNECTION_POOL * cp)
+pool_discard_packet_contents(POOL_CONNECTION_POOL *cp)
{
int len,
i;
@@ -4834,7 +4838,8 @@ pool_discard_packet_contents(POOL_CONNECTION_POOL * cp)
/*
* Read packet from either frontend or backend and process it.
*/
-static POOL_STATUS read_packets_and_process(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int reset_request, int *state, short *num_fields, bool *cont)
+static POOL_STATUS
+read_packets_and_process(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int reset_request, int *state, short *num_fields, bool *cont)
{
fd_set readmask;
fd_set writemask;
@@ -4982,7 +4987,7 @@ SELECT_RETRY:
ereport(LOG,
(errmsg("error occurred while reading and processing packets"),
errdetail("FATAL ERROR: VALID_BACKEND returns non 0 but connection slot is empty. backend id:%d RAW_MODE:%d LOAD_BALANCE_STATUS:%d status:%d",
- i, RAW_MODE, LOAD_BALANCE_STATUS(i), BACKEND_INFO(i).backend_status)));
+ i, RAW_MODE, LOAD_BALANCE_STATUS (i), BACKEND_INFO(i).backend_status)));
was_error = 1;
break;
}
@@ -5006,7 +5011,8 @@ SELECT_RETRY:
}
/*
- * connection was terminated due to idle_in_transaction_session_timeout expired
+ * connection was terminated due to
+ * idle_in_transaction_session_timeout expired
*/
r = detect_idle_in_transaction_session_timeout_error(CONNECTION(backend, i), MAJOR(backend));
if (r == SPECIFIED_ERROR)
@@ -5017,7 +5023,8 @@ SELECT_RETRY:
}
/*
- * connection was terminated due to idle_session_timeout expired
+ * connection was terminated due to idle_session_timeout
+ * expired
*/
r = detect_idle_session_timeout_error(CONNECTION(backend, i), MAJOR(backend));
if (r == SPECIFIED_ERROR)
@@ -5167,7 +5174,7 @@ pool_dump_valid_backend(int backend_id)
* Returns true if data was actually pushed.
*/
bool
-pool_push_pending_data(POOL_CONNECTION * backend)
+pool_push_pending_data(POOL_CONNECTION *backend)
{
POOL_SESSION_CONTEXT *session_context;
int len;
@@ -5175,8 +5182,8 @@ pool_push_pending_data(POOL_CONNECTION * backend)
bool pending_data_existed = false;
static char random_statement[] = "pgpool_non_existent";
- int num_pending_messages;
- int num_pushed_messages;
+ int num_pending_messages;
+ int num_pushed_messages;
if (!pool_get_session_context(true) || !pool_is_doing_extended_query_message())
return false;
@@ -5194,7 +5201,8 @@ pool_push_pending_data(POOL_CONNECTION * backend)
* In streaming replication mode, send a Close message for none existing
* prepared statement and flush message before going any further to
* retrieve and save any pending response packet from backend. This
- * ensures that at least "close complete" message is returned from backend.
+ * ensures that at least "close complete" message is returned from
+ * backend.
*
* The saved packets will be popped up before returning to caller. This
* preserves the user's expectation of packet sequence.
@@ -5254,7 +5262,7 @@ pool_push_pending_data(POOL_CONNECTION * backend)
len = ntohl(len);
len -= sizeof(len);
buf = NULL;
- if (len > 0)
+ if (len > 0)
{
buf = palloc(len);
pool_read(backend, buf, len);
diff --git a/src/protocol/pool_proto2.c b/src/protocol/pool_proto2.c
index ffca996c3..bd09a4494 100644
--- a/src/protocol/pool_proto2.c
+++ b/src/protocol/pool_proto2.c
@@ -34,8 +34,8 @@
#include "utils/elog.h"
POOL_STATUS
-AsciiRow(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+AsciiRow(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
short num_fields)
{
static char nullmap[8192],
@@ -161,8 +161,8 @@ AsciiRow(POOL_CONNECTION * frontend,
}
POOL_STATUS
-BinaryRow(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+BinaryRow(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
short num_fields)
{
static char nullmap[8192],
@@ -273,8 +273,8 @@ BinaryRow(POOL_CONNECTION * frontend,
}
POOL_STATUS
-CompletedResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+CompletedResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
int i;
char *string = NULL;
@@ -339,8 +339,8 @@ CompletedResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-CursorResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+CursorResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char *string = NULL;
char *string1 = NULL;
@@ -387,8 +387,8 @@ CursorResponse(POOL_CONNECTION * frontend,
}
void
-EmptyQueryResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+EmptyQueryResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char c;
int i;
@@ -406,8 +406,8 @@ EmptyQueryResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-ErrorResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+ErrorResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char *string = "";
int len = 0;
@@ -448,8 +448,8 @@ ErrorResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-FunctionResultResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+FunctionResultResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char dummy;
int len;
@@ -517,8 +517,8 @@ FunctionResultResponse(POOL_CONNECTION * frontend,
}
void
-NoticeResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+NoticeResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char *string = NULL;
int len = 0;
@@ -550,8 +550,8 @@ NoticeResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-NotificationResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+NotificationResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
int pid,
pid1;
@@ -593,8 +593,8 @@ NotificationResponse(POOL_CONNECTION * frontend,
}
int
-RowDescription(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+RowDescription(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
short *result)
{
short num_fields,
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index c2802998a..6fad3353c 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -86,27 +86,27 @@ int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR
*/
char query_string_buffer[QUERY_STRING_BUFFER_LEN];
-static int check_errors(POOL_CONNECTION_POOL * backend, int backend_id);
+static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
static void generate_error_message(char *prefix, int specific_error, char *query);
-static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- POOL_SENT_MESSAGE * message,
- POOL_SENT_MESSAGE * bind_message);
-static POOL_STATUS send_prepare(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- POOL_SENT_MESSAGE * message);
+static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ POOL_SENT_MESSAGE *message,
+ POOL_SENT_MESSAGE *bind_message);
+static POOL_STATUS send_prepare(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ POOL_SENT_MESSAGE *message);
static int *find_victim_nodes(int *ntuples, int nmembers, int main_node, int *number_of_nodes);
-static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend);
+static POOL_STATUS close_standby_transactions(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend);
static char *flatten_set_variable_args(const char *name, List *args);
static bool
- process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context);
-static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend);
-static void si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node, bool tstate_check);
+ process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context);
+static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend);
+static void si_get_snapshot(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node, bool tstate_check);
-static bool check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
+static bool check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
static bool multi_statement_query(char *buf);
@@ -137,7 +137,7 @@ static POOL_QUERY_CONTEXT *create_dummy_query_context(void);
*
*/
static bool
-process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)
+process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context)
{
/*
* locate pg_terminate_backend and get the pid argument, if
@@ -184,8 +184,8 @@ process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)
* If frontend == NULL, we are called in case of reset queries.
*/
POOL_STATUS
-SimpleQuery(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend, int len, char *contents)
+SimpleQuery(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend, int len, char *contents)
{
static char *sq_config = "pool_status";
static char *sq_pools = "pool_pools";
@@ -249,7 +249,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
* Fetch memory cache if possible
*/
if (pool_config->memory_cache_enabled)
- is_likely_select = pool_is_likely_select(contents);
+ is_likely_select = pool_is_likely_select(contents);
/*
* If memory query cache enabled and the query seems to be a SELECT use
@@ -261,8 +261,8 @@ SimpleQuery(POOL_CONNECTION * frontend,
* transaction, but it will need parsing query and accessing to system
* catalog, which will add significant overhead. Moreover if we are in
* aborted transaction, commands should be ignored, so we should not use
+ * query cache. Also query cache is disabled, we should not fetch from
* query cache.
- * Also query cache is disabled, we should not fetch from query cache.
*/
if (pool_config->memory_cache_enabled && is_likely_select &&
!pool_is_writing_transaction() &&
@@ -308,6 +308,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
else
{
query_context->is_multi_statement = false;
+
/*
* Do not use minimal parser if we are in native replication or
* snapshot isolation mode.
@@ -355,8 +356,8 @@ SimpleQuery(POOL_CONNECTION * frontend,
* were an DELETE command. Note that the DELETE command does not
* execute, instead the original query will be sent to backends,
* which may or may not cause an actual syntax errors. The command
- * will be sent to all backends in replication mode or
- * primary in native replication mode.
+ * will be sent to all backends in replication mode or primary in
+ * native replication mode.
*/
if (!strcmp(remote_host, "[local]"))
{
@@ -438,7 +439,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
(errmsg("DB's oid to discard its cache directory: dboid = %d", query_context->dboid)));
}
}
-
+
/*
* check COPY FROM STDIN if true, set copy_* variable
*/
@@ -669,13 +670,13 @@ SimpleQuery(POOL_CONNECTION * frontend,
struct timeval stime;
stime.tv_usec = 0;
- stime.tv_sec = 5; /* XXX give arbitrary time to allow
- * closing idle connections */
+ stime.tv_sec = 5; /* XXX give arbitrary time to allow closing
+ * idle connections */
ereport(DEBUG1,
(errmsg("Query: sending SIGUSR1 signal to parent")));
- ignore_sigusr1 = 1; /* disable SIGUSR1 handler */
+ ignore_sigusr1 = 1; /* disable SIGUSR1 handler */
close_idle_connections();
/*
@@ -684,7 +685,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
*/
for (;;)
{
- int sts;
+ int sts;
errno = 0;
sts = select(0, NULL, NULL, NULL, &stime);
@@ -824,8 +825,9 @@ SimpleQuery(POOL_CONNECTION * frontend,
/*
* If the query is BEGIN READ WRITE or BEGIN ... SERIALIZABLE
* in streaming replication mode, we send BEGIN to standbys
- * instead. The original_query which is BEGIN READ WRITE is sent
- * to primary. The rewritten_query BEGIN is sent to standbys.
+ * instead. The original_query which is BEGIN READ WRITE is
+ * sent to primary. The rewritten_query BEGIN is sent to
+ * standbys.
*/
if (pool_need_to_treat_as_if_default_transaction(query_context))
{
@@ -898,8 +900,8 @@ SimpleQuery(POOL_CONNECTION * frontend,
}
/*
- * Send "COMMIT" or "ROLLBACK" to only main node if query is
- * "COMMIT" or "ROLLBACK"
+ * Send "COMMIT" or "ROLLBACK" to only main node if query is "COMMIT"
+ * or "ROLLBACK"
*/
if (commit)
{
@@ -930,7 +932,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
* process EXECUTE (V3 only)
*/
POOL_STATUS
-Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
+Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
int commit = 0;
@@ -942,7 +944,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
POOL_SENT_MESSAGE *bind_msg;
bool foundp = false;
int num_rows;
- char *p;
+ char *p;
/* Get session context */
session_context = pool_get_session_context(false);
@@ -1002,7 +1004,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
elog(DEBUG1, "set partial_fetch in execute");
}
elog(DEBUG1, "execute: partial_fetch: %d", query_context->partial_fetch);
-
+
strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query)));
@@ -1017,8 +1019,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
/*
* Fetch memory cache if possible. Also if atEnd is false or the execute
- * message has 0 row argument, we maybe able to use cache.
- * If partial_fetch is true, cannot use cache.
+ * message has 0 row argument, we maybe able to use cache. If
+ * partial_fetch is true, cannot use cache.
*/
if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() &&
(TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
@@ -1071,10 +1073,10 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
}
/*
- * If bind message is sent again to an existing prepared statement,
- * it is possible that query_w_hex remains. Before setting newly
- * allocated query_w_hex's pointer to the query context, free the
- * previously allocated memory.
+ * If bind message is sent again to an existing prepared
+ * statement, it is possible that query_w_hex remains. Before
+ * setting newly allocated query_w_hex's pointer to the query
+ * context, free the previously allocated memory.
*/
if (query_context->query_w_hex)
{
@@ -1203,8 +1205,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
}
/*
- * send "COMMIT" or "ROLLBACK" to only main node if query is
- * "COMMIT" or "ROLLBACK"
+ * send "COMMIT" or "ROLLBACK" to only main node if query is "COMMIT"
+ * or "ROLLBACK"
*/
if (commit)
{
@@ -1245,8 +1247,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
* Take care of "writing transaction" flag.
*/
if ((!is_select_query(node, query) || pool_has_function_call(node)) &&
- !is_start_transaction_query(node) &&
- !is_commit_or_rollback_query(node))
+ !is_start_transaction_query(node) &&
+ !is_commit_or_rollback_query(node))
{
ereport(DEBUG1,
(errmsg("Execute: TSTATE:%c",
@@ -1280,7 +1282,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
* process Parse (V3 only)
*/
POOL_STATUS
-Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
+Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
int deadlock_detected = 0;
@@ -1316,7 +1318,7 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
/* parse SQL string */
MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
- parse_tree_list = raw_parser(stmt, RAW_PARSE_DEFAULT, strlen(stmt),&error,!REPLICATION);
+ parse_tree_list = raw_parser(stmt, RAW_PARSE_DEFAULT, strlen(stmt), &error, !REPLICATION);
if (parse_tree_list == NIL)
{
@@ -1338,8 +1340,8 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
* were an DELETE command. Note that the DELETE command does not
* execute, instead the original query will be sent to backends,
* which may or may not cause an actual syntax errors. The command
- * will be sent to all backends in replication mode or
- * primary in native replication mode.
+ * will be sent to all backends in replication mode or primary in
+ * native replication mode.
*/
if (!strcmp(remote_host, "[local]"))
{
@@ -1505,9 +1507,9 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
/*
* If the query is BEGIN READ WRITE in main replica mode, we send
- * BEGIN instead of it to standbys. original_query which is
- * BEGIN READ WRITE is sent to primary. rewritten_query which is BEGIN
- * is sent to standbys.
+ * BEGIN instead of it to standbys. original_query which is BEGIN READ
+ * WRITE is sent to primary. rewritten_query which is BEGIN is sent to
+ * standbys.
*/
if (is_start_transaction_query(query_context->parse_tree) &&
is_read_write((TransactionStmt *) query_context->parse_tree) &&
@@ -1518,7 +1520,8 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
}
/*
- * If not in streaming or logical replication mode, send "SYNC" message if not in a transaction.
+ * If not in streaming or logical replication mode, send "SYNC" message if
+ * not in a transaction.
*/
if (!SL_MODE)
{
@@ -1565,10 +1568,10 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
kind)));
}
else
- ereport(ERROR,
- (errmsg("unable to parse the query"),
- errdetail("invalid read kind \"%c\" returned from backend after Sync message sent",
- kind)));
+ ereport(ERROR,
+ (errmsg("unable to parse the query"),
+ errdetail("invalid read kind \"%c\" returned from backend after Sync message sent",
+ kind)));
}
/*
@@ -1687,7 +1690,7 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
}
POOL_STATUS
-Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
+Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
char *pstmt_name;
@@ -1786,7 +1789,7 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
{
pool_where_to_send(query_context, query_context->original_query,
- query_context->parse_tree);
+ query_context->parse_tree);
}
/*
@@ -1812,7 +1815,7 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
&oids);
/* Save to oid buffer */
for (i = 0; i < num_oids; i++)
- pool_add_dml_table_oid(oids[i]);
+ pool_add_dml_table_oid(oids[i]);
}
}
@@ -1883,7 +1886,7 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
}
POOL_STATUS
-Describe(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
+Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
POOL_SENT_MESSAGE *msg;
@@ -1976,7 +1979,7 @@ Describe(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
POOL_STATUS
-Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
+Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
POOL_SENT_MESSAGE *msg;
@@ -2012,10 +2015,9 @@ Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
errmsg("unable to execute close, invalid message")));
/*
- * For PostgreSQL, calling close on non existing portals or
- * statements is not an error. So on the same footings we will ignore all
- * such calls and return the close complete message to clients with out
- * going to backend
+ * For PostgreSQL, calling close on non existing portals or statements is
+ * not an error. So on the same footings we will ignore all such calls and
+ * return the close complete message to clients with out going to backend
*/
if (!msg)
{
@@ -2107,7 +2109,7 @@ Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
POOL_STATUS
-FunctionCall3(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
+FunctionCall3(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
int len, char *contents)
{
/*
@@ -2138,8 +2140,8 @@ FunctionCall3(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
* - internal transaction is closed
*/
POOL_STATUS
-ReadyForQuery(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend, bool send_ready, bool cache_commit)
+ReadyForQuery(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend, bool send_ready, bool cache_commit)
{
int i;
int len;
@@ -2189,7 +2191,7 @@ ReadyForQuery(POOL_CONNECTION * frontend,
if (victim_nodes)
{
int i;
- StringInfoData msg;
+ StringInfoData msg;
initStringInfo(&msg);
appendStringInfoString(&msg, "ReadyForQuery: Degenerate backends:");
@@ -2280,7 +2282,7 @@ ReadyForQuery(POOL_CONNECTION * frontend,
/* if (pool_is_query_in_progress() && allow_close_transaction) */
if (REPLICATION && allow_close_transaction)
{
- bool internal_transaction_started = INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID);
+ bool internal_transaction_started = INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID);
/*
* If we are running in snapshot isolation mode and started an
@@ -2332,10 +2334,11 @@ ReadyForQuery(POOL_CONNECTION * frontend,
TSTATE(backend, i) = kind;
ereport(DEBUG5,
(errmsg("processing ReadyForQuery"),
- errdetail("transaction state of node %d '%c'(%02x)", i, kind , kind)));
+ errdetail("transaction state of node %d '%c'(%02x)", i, kind, kind)));
/*
- * The transaction state to be returned to frontend is main node's.
+ * The transaction state to be returned to frontend is main
+ * node's.
*/
if (i == (MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))
{
@@ -2475,8 +2478,9 @@ ReadyForQuery(POOL_CONNECTION * frontend,
/*
* Close running transactions on standbys.
*/
-static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+static POOL_STATUS
+close_standby_transactions(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
int i;
@@ -2500,7 +2504,7 @@ static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
}
POOL_STATUS
-ParseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
POOL_SESSION_CONTEXT *session_context;
@@ -2524,7 +2528,7 @@ ParseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
}
POOL_STATUS
-BindComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
POOL_SESSION_CONTEXT *session_context;
@@ -2548,7 +2552,7 @@ BindComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
}
POOL_STATUS
-CloseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+CloseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
POOL_SESSION_CONTEXT *session_context;
POOL_STATUS status;
@@ -2622,8 +2626,8 @@ CloseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
}
POOL_STATUS
-ParameterDescription(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+ParameterDescription(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
int len,
len1 = 0;
@@ -2706,8 +2710,8 @@ ParameterDescription(POOL_CONNECTION * frontend,
}
POOL_STATUS
-ErrorResponse3(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+ErrorResponse3(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
POOL_STATUS ret;
@@ -2722,8 +2726,8 @@ ErrorResponse3(POOL_CONNECTION * frontend,
}
POOL_STATUS
-FunctionCall(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+FunctionCall(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char dummy[2];
int oid;
@@ -2818,8 +2822,8 @@ FunctionCall(POOL_CONNECTION * frontend,
}
POOL_STATUS
-ProcessFrontendResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+ProcessFrontendResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
char fkind;
char *bufp = NULL;
@@ -3077,8 +3081,8 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-ProcessBackendResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+ProcessBackendResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
int *state, short *num_fields)
{
int status = POOL_CONTINUE;
@@ -3226,10 +3230,12 @@ ProcessBackendResponse(POOL_CONNECTION * frontend,
case 'E': /* ErrorResponse */
if (pool_is_doing_extended_query_message())
{
- char *message;
+ char *message;
- /* Log the error message which was possibly missed till
- * a sync message was sent */
+ /*
+ * Log the error message which was possibly missed till a
+ * sync message was sent
+ */
if (pool_extract_error_message(false, MAIN(backend), PROTO_MAJOR_V3,
true, &message) == 1)
{
@@ -3396,8 +3402,8 @@ ProcessBackendResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-CopyInResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+CopyInResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
POOL_STATUS status;
@@ -3415,8 +3421,8 @@ CopyInResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-CopyOutResponse(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+CopyOutResponse(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
POOL_STATUS status;
@@ -3434,8 +3440,8 @@ CopyOutResponse(POOL_CONNECTION * frontend,
}
POOL_STATUS
-CopyDataRows(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend, int copyin)
+CopyDataRows(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend, int copyin)
{
char *string = NULL;
int len;
@@ -3476,6 +3482,7 @@ CopyDataRows(POOL_CONNECTION * frontend,
copy_count++;
continue;
}
+
/*
* Flush (H) or Sync (S) messages should be ignored while in
* the COPY IN mode.
@@ -3607,7 +3614,7 @@ CopyDataRows(POOL_CONNECTION * frontend,
* transaction state.
*/
void
-raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)
+raise_intentional_error_if_need(POOL_CONNECTION_POOL *backend)
{
int i;
POOL_SESSION_CONTEXT *session_context;
@@ -3688,7 +3695,7 @@ raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)
*---------------------------------------------------
*/
static int
-check_errors(POOL_CONNECTION_POOL * backend, int backend_id)
+check_errors(POOL_CONNECTION_POOL *backend, int backend_id)
{
/*
@@ -3760,7 +3767,7 @@ generate_error_message(char *prefix, int specific_error, char *query)
"received query cancel error message from main node. query: %s"
};
- StringInfoData msg;
+ StringInfoData msg;
session_context = pool_get_session_context(true);
if (!session_context)
@@ -3788,15 +3795,15 @@ generate_error_message(char *prefix, int specific_error, char *query)
* Make per DB node statement log
*/
void
-per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query)
+per_node_statement_log(POOL_CONNECTION_POOL *backend, int node_id, char *query)
{
- ProcessInfo *pi = pool_get_my_process_info();
+ ProcessInfo *pi = pool_get_my_process_info();
POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
if (pool_config->log_per_node_statement)
ereport(LOG,
(errmsg("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query)));
-
+
pi_set(node_id);
StrNCpy(pi->statement, query, MAXSTMTLEN);
}
@@ -3807,7 +3814,7 @@ per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query)
void
init_pi_set(void)
{
- ProcessInfo *pi = pool_get_my_process_info();
+ ProcessInfo *pi = pool_get_my_process_info();
memset(pi->node_ids, 0, sizeof(pi->node_ids));
pi->statement[0] = '\0';
@@ -3819,7 +3826,7 @@ init_pi_set(void)
void
pi_set(int node_id)
{
- ProcessInfo *pi = pool_get_my_process_info();
+ ProcessInfo *pi = pool_get_my_process_info();
if (node_id < BITS_PER_TYPE(uint64))
pi->node_ids[0] |= (1 << node_id);
@@ -3833,7 +3840,7 @@ pi_set(int node_id)
bool
is_pi_set(uint64 *node_ids, int node_id)
{
- int set;
+ int set;
if (node_id < BITS_PER_TYPE(uint64))
set = node_ids[0] & (1 << node_id);
@@ -3846,7 +3853,7 @@ is_pi_set(uint64 *node_ids, int node_id)
* Make per DB node statement notice message
*/
void
-per_node_statement_notice(POOL_CONNECTION_POOL * backend, int node_id, char *query)
+per_node_statement_notice(POOL_CONNECTION_POOL *backend, int node_id, char *query)
{
if (pool_config->notice_per_node_statement)
ereport(NOTICE,
@@ -3856,56 +3863,57 @@ per_node_statement_notice(POOL_CONNECTION_POOL * backend, int node_id, char *que
/*
* Make backend message log when log_backend_messages is on.
*/
-void log_backend_messages(unsigned char kind, int backend_id)
+void
+log_backend_messages(unsigned char kind, int backend_id)
{
/*
* Map table for message kind and message label
*/
typedef struct
{
- unsigned char kind; /* message kind */
- char *label; /* message label */
- } BackendMessage;
-
+ unsigned char kind; /* message kind */
+ char *label; /* message label */
+ } BackendMessage;
+
static BackendMessage message_label[] =
- {
- {'1', "ParseComplete"},
- {'2', "BindComplete"},
- {'3', "CloseComplete"},
- {'A', "NotificationResponse"},
- {'C', "CommandComplete"},
- {'D', "DataRow"},
- {'E', "ErrorResponse"},
- {'G', "CopyInResponse"},
- {'H', "CopyOutResponse"},
- {'I', "EmptyQueryResponse"},
- {'K', "BackendKeyData"},
- {'N', "NoticeResponse"},
- {'R', "AuthenticationRequest"},
- {'S', "ParameterStatus"},
- {'T', "RowDescription"},
- {'V', "FunctionCallResponse"},
- {'W', "CopyBothResponse"},
- {'Z', "ReadyForQuery"},
- {'n', "NoData"},
- {'s', "PortalSuspended"},
- {'t', "ParameterDescription"},
- {'v', "NegotiateProtocolVersion"},
- {'c', "CopyDone"},
- {'d', "CopyData"},
- };
-
+ {
+ {'1', "ParseComplete"},
+ {'2', "BindComplete"},
+ {'3', "CloseComplete"},
+ {'A', "NotificationResponse"},
+ {'C', "CommandComplete"},
+ {'D', "DataRow"},
+ {'E', "ErrorResponse"},
+ {'G', "CopyInResponse"},
+ {'H', "CopyOutResponse"},
+ {'I', "EmptyQueryResponse"},
+ {'K', "BackendKeyData"},
+ {'N', "NoticeResponse"},
+ {'R', "AuthenticationRequest"},
+ {'S', "ParameterStatus"},
+ {'T', "RowDescription"},
+ {'V', "FunctionCallResponse"},
+ {'W', "CopyBothResponse"},
+ {'Z', "ReadyForQuery"},
+ {'n', "NoData"},
+ {'s', "PortalSuspended"},
+ {'t', "ParameterDescription"},
+ {'v', "NegotiateProtocolVersion"},
+ {'c', "CopyDone"},
+ {'d', "CopyData"},
+ };
+
/* store last kind for each backend */
static unsigned char kind_cache[MAX_NUM_BACKENDS];
/* number of repetitions of each kind */
- static int kind_count[MAX_NUM_BACKENDS];
+ static int kind_count[MAX_NUM_BACKENDS];
- int kind_num = sizeof(message_label)/sizeof(BackendMessage);
- char *label;
- static char *last_label;
- int i;
+ int kind_num = sizeof(message_label) / sizeof(BackendMessage);
+ char *label;
+ static char *last_label;
+ int i;
/* do nothing if log_backend_messages is disabled */
if (pool_config->log_backend_messages == BGMSG_NONE)
@@ -3944,7 +3952,7 @@ void log_backend_messages(unsigned char kind, int backend_id)
(errmsg("%s message from backend %d", label, backend_id)));
return;
}
-
+
/* just to make sure the setting is terse */
if (pool_config->log_backend_messages != BGMSG_TERSE)
{
@@ -3991,11 +3999,11 @@ void log_backend_messages(unsigned char kind, int backend_id)
* All data read in this function is returned to stream.
*/
char
-per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, char *prefix, bool unread)
+per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query, char *prefix, bool unread)
{
POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
char *message;
- char kind;
+ char kind;
pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind));
pool_unread(CONNECTION(backend, node_id), &kind, sizeof(kind));
@@ -4020,10 +4028,11 @@ per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, cha
* message is not yet parsed on the primary/main node but parsed on other
* node. Caller must provide the parse message data as "message".
*/
-static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- POOL_SENT_MESSAGE * message,
- POOL_SENT_MESSAGE * bind_message)
+static POOL_STATUS
+parse_before_bind(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ POOL_SENT_MESSAGE *message,
+ POOL_SENT_MESSAGE *bind_message)
{
int i;
int len = message->len;
@@ -4061,8 +4070,8 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
* Before sending the parse message to the primary, we need to
* close the named statement. Otherwise we will get an error from
* backend if the named statement already exists. This could
- * happen if parse_before_bind is called with a bind message
- * using the same named statement. If the named statement does not
+ * happen if parse_before_bind is called with a bind message using
+ * the same named statement. If the named statement does not
* exist, it's fine. PostgreSQL just ignores a request trying to
* close a non-existing statement. If the statement is unnamed
* one, we do not need it because unnamed statement can be
@@ -4105,8 +4114,10 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
bind_message->query_context = new_qc;
#ifdef NOT_USED
+
/*
- * XXX pool_remove_sent_message() will pfree memory allocated by "contents".
+ * XXX pool_remove_sent_message() will pfree memory allocated by
+ * "contents".
*/
/* Remove old sent message */
@@ -4187,14 +4198,16 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
* node. Caller must provide the PREPARED message information as "message"
* argument.
*/
-static POOL_STATUS send_prepare(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- POOL_SENT_MESSAGE * message)
+static POOL_STATUS
+send_prepare(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ POOL_SENT_MESSAGE *message)
{
int node_id;
bool backup[MAX_NUM_BACKENDS];
- POOL_QUERY_CONTEXT *qc, *new_qc;
- char qbuf[1024];
+ POOL_QUERY_CONTEXT *qc,
+ *new_qc;
+ char qbuf[1024];
POOL_SELECT_RESULT *res;
elog(DEBUG1, "send_prepare called");
@@ -4222,14 +4235,14 @@ static POOL_STATUS send_prepare(POOL_CONNECTION * frontend,
}
/*
- * we are in streaming replication mode and the PREPARE message has
- * not been sent to primary yet.
+ * we are in streaming replication mode and the PREPARE message has not
+ * been sent to primary yet.
*/
/*
- * Prepare modified query context This is a copy of original PREPARE
- * query context except the query sending destination is changed to
- * primary node.
+ * Prepare modified query context This is a copy of original PREPARE query
+ * context except the query sending destination is changed to primary
+ * node.
*/
new_qc = pool_query_context_shallow_copy(qc);
memset(new_qc->where_to_send, 0, sizeof(new_qc->where_to_send));
@@ -4242,8 +4255,8 @@ static POOL_STATUS send_prepare(POOL_CONNECTION * frontend,
{
/*
* Before sending the PREPARE message to the primary, we need to
- * DEALLOCATE the named statement. Otherwise we will get an error
- * from backend if an identical named statement already exists.
+ * DEALLOCATE the named statement. Otherwise we will get an error from
+ * backend if an identical named statement already exists.
*/
/* check to see if the named statement exists on primary node */
@@ -4256,6 +4269,7 @@ static POOL_STATUS send_prepare(POOL_CONNECTION * frontend,
if (res && res->data[0] && strcmp(res->data[0], "0"))
{
free_select_result(res);
+
/*
* The same named statement exists, We need to send DEALLOCATE
* message
@@ -4483,7 +4497,7 @@ flatten_set_variable_args(const char *name, List *args)
* Wait till ready for query received.
*/
static void
-pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)
+pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend)
{
char kind;
int len;
@@ -4531,8 +4545,8 @@ pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)
* is read.
*/
static void
-pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
POOL_PENDING_MESSAGE *pmsg;
int i;
@@ -4641,7 +4655,7 @@ pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
* Preconditions: query is in progress. The command is succeeded.
*/
void
-pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+pool_at_command_success(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
Node *node;
char *query;
@@ -4769,7 +4783,7 @@ pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backe
* read message length (V3 only)
*/
int
-pool_read_message_length(POOL_CONNECTION_POOL * cp)
+pool_read_message_length(POOL_CONNECTION_POOL *cp)
{
int length,
length0;
@@ -4821,7 +4835,7 @@ pool_read_message_length(POOL_CONNECTION_POOL * cp)
* The array is in the static storage, thus it will be destroyed by subsequent calls.
*/
int *
-pool_read_message_length2(POOL_CONNECTION_POOL * cp)
+pool_read_message_length2(POOL_CONNECTION_POOL *cp)
{
int length,
length0;
@@ -4877,7 +4891,7 @@ pool_read_message_length2(POOL_CONNECTION_POOL * cp)
void
pool_emit_log_for_message_length_diff(int *length_array, char *name)
{
- int length0, /* message length of main node id */
+ int length0, /* message length of main node id */
length;
int i;
@@ -4908,7 +4922,7 @@ pool_emit_log_for_message_length_diff(int *length_array, char *name)
* Read kind from all valid backend
*/
signed char
-pool_read_kind(POOL_CONNECTION_POOL * cp)
+pool_read_kind(POOL_CONNECTION_POOL *cp)
{
char kind0,
kind;
@@ -4966,7 +4980,7 @@ pool_read_kind(POOL_CONNECTION_POOL * cp)
}
int
-pool_read_int(POOL_CONNECTION_POOL * cp)
+pool_read_int(POOL_CONNECTION_POOL *cp)
{
int data0,
data;
@@ -5006,7 +5020,7 @@ pool_read_int(POOL_CONNECTION_POOL * cp)
* In case of starting an internal transaction, this should be false.
*/
static void
-si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node * node, bool tstate_check)
+si_get_snapshot(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node, bool tstate_check)
{
POOL_SESSION_CONTEXT *session_context;
@@ -5015,13 +5029,12 @@ si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node
return;
/*
- * From now on it is possible that query is actually sent to backend.
- * So we need to acquire snapshot while there's no committing backend
- * in snapshot isolation mode except while processing reset queries.
- * For this purpose, we send a query to know whether the transaction
- * is READ ONLY or not. Sending actual user's query is not possible
- * because it might cause rw-conflict, which in turn causes a
- * deadlock.
+ * From now on it is possible that query is actually sent to backend. So
+ * we need to acquire snapshot while there's no committing backend in
+ * snapshot isolation mode except while processing reset queries. For this
+ * purpose, we send a query to know whether the transaction is READ ONLY
+ * or not. Sending actual user's query is not possible because it might
+ * cause rw-conflict, which in turn causes a deadlock.
*/
if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
(!tstate_check || (tstate_check && TSTATE(backend, MAIN_NODE_ID) == 'T')) &&
@@ -5029,16 +5042,17 @@ si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node
!si_snapshot_prepared() &&
frontend && frontend->no_forward == 0)
{
- int i;
+ int i;
si_acquire_snapshot();
for (i = 0; i < NUM_BACKENDS; i++)
{
- static char *si_query = "SELECT current_setting('transaction_read_only')";
+ static char *si_query = "SELECT current_setting('transaction_read_only')";
POOL_SELECT_RESULT *res;
- /* We cannot use VALID_BACKEND macro here because load balance
+ /*
+ * We cannot use VALID_BACKEND macro here because load balance
* node has not been decided yet.
*/
if (!VALID_BACKEND_RAW(i))
@@ -5066,10 +5080,10 @@ si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node
* false to caller.
*/
static bool
-check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend)
+check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend)
{
- int len;
+ int len;
if (TSTATE(backend, MAIN_NODE_ID) != 'E')
return true;
@@ -5087,14 +5101,14 @@ check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * fro
/* send an error message to frontend */
pool_send_error_message(
- frontend,
- MAJOR(backend),
- "25P02",
- "current transaction is aborted, commands ignored until end of transaction block",
- buf.data,
- "",
- __FILE__,
- __LINE__);
+ frontend,
+ MAJOR(backend),
+ "25P02",
+ "current transaction is aborted, commands ignored until end of transaction block",
+ buf.data,
+ "",
+ __FILE__,
+ __LINE__);
pfree(buf.data);
@@ -5117,14 +5131,15 @@ check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * fro
* As far as I know this is the most accurate and cheap way.
*/
static
-bool multi_statement_query(char *queries)
+bool
+multi_statement_query(char *queries)
{
PsqlScanState sstate;
promptStatus_t prompt;
PsqlScanResult sr;
PQExpBufferData lbuf;
- int num_semicolons = 0;
- bool done = false;
+ int num_semicolons = 0;
+ bool done = false;
/*
* callback functions for our flex lexer. need this to prevent crash when
@@ -5134,9 +5149,9 @@ bool multi_statement_query(char *queries)
NULL
};
- initPQExpBuffer(&lbuf); /* initialize line buffer */
+ initPQExpBuffer(&lbuf); /* initialize line buffer */
- sstate = psql_scan_create(&psqlscan_callbacks); /* create scan state */
+ sstate = psql_scan_create(&psqlscan_callbacks); /* create scan state */
/* add the query string to the scan state */
psql_scan_setup(sstate, queries, strlen(queries), 0, true);
@@ -5144,9 +5159,9 @@ bool multi_statement_query(char *queries)
for (;;)
{
resetPQExpBuffer(&lbuf);
- sr = psql_scan(sstate, &lbuf, &prompt); /* run scanner */
+ sr = psql_scan(sstate, &lbuf, &prompt); /* run scanner */
- switch(sr)
+ switch (sr)
{
case PSCAN_SEMICOLON: /* found command-ending semicolon */
num_semicolons++;
@@ -5154,7 +5169,8 @@ bool multi_statement_query(char *queries)
case PSCAN_BACKSLASH: /* found backslash command */
break;
case PSCAN_INCOMPLETE: /* end of line, SQL statement incomplete */
- case PSCAN_EOL: /* end of line, SQL possibly complete */
+ case PSCAN_EOL: /* end of line, SQL possibly complete */
+
/*
* If we have already seen ";" and this time something is
* transferred into buffer, we assume that the last query is
@@ -5193,17 +5209,17 @@ bool multi_statement_query(char *queries)
static void
check_prepare(List *parse_tree_list, int len, char *contents)
{
- Node *node;
- RawStmt *rstmt;
- POOL_QUERY_CONTEXT *query_context;
- ListCell *l;
- POOL_SENT_MESSAGE *message;
+ Node *node;
+ RawStmt *rstmt;
+ POOL_QUERY_CONTEXT *query_context;
+ ListCell *l;
+ POOL_SENT_MESSAGE *message;
/* sanity check */
if (list_length(parse_tree_list) <= 1)
return;
- foreach (l, parse_tree_list)
+ foreach(l, parse_tree_list)
{
if (l == list_head(parse_tree_list)) /* skip the first parse tree */
continue;
@@ -5214,14 +5230,16 @@ check_prepare(List *parse_tree_list, int len, char *contents)
if (!IsA(node, PrepareStmt)) /* PREPARE? */
continue;
- query_context = pool_init_query_context(); /* initialize query context */
- query_context->is_multi_statement = true; /* this is a multi statement query */
+ query_context = pool_init_query_context(); /* initialize query
+ * context */
+ query_context->is_multi_statement = true; /* this is a multi
+ * statement query */
pool_start_query(query_context, contents, len, node); /* start query context */
pool_where_to_send(query_context, query_context->original_query, /* set query destination */
query_context->parse_tree);
message = pool_create_sent_message('Q', len, contents, 0, /* create sent message */
((PrepareStmt *) node)->name, query_context);
- pool_add_sent_message(message); /* add it to the sent message list */
+ pool_add_sent_message(message); /* add it to the sent message list */
}
}
@@ -5231,12 +5249,13 @@ check_prepare(List *parse_tree_list, int len, char *contents)
* set.
*/
static
-POOL_QUERY_CONTEXT *create_dummy_query_context(void)
+POOL_QUERY_CONTEXT *
+create_dummy_query_context(void)
{
POOL_QUERY_CONTEXT *query_context;
- Node *node;
+ Node *node;
MemoryContext old_context;
- char *query = "UNKNOWN QUERY";
+ char *query = "UNKNOWN QUERY";
query_context = pool_init_query_context();
old_context = MemoryContextSwitchTo(query_context->memory_context);