1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
#include "pgqd.h"
static void close_retry(struct PgDatabase *db, double sleep_time)
{
log_debug("%s: close_retry, %f", db->name, sleep_time);
pgs_reconnect(db->c_retry, sleep_time);
}
static void run_retry(struct PgDatabase *db)
{
const char *q = "select * from pgq.maint_retry_events()";
log_debug("%s: %s", db->name, q);
pgs_send_query_simple(db->c_retry, q);
}
static void parse_retry(struct PgDatabase *db, PGresult *res)
{
if (PQntuples(res) == 1) {
char *val = PQgetvalue(res, 0, 0);
stats.n_retry += atoi(val);
if (strcmp(val, "0") != 0) {
run_retry(db);
return;
}
}
close_retry(db, cf.retry_period);
}
static void retry_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
{
struct PgDatabase *db = arg;
switch (ev) {
case PGS_CONNECT_OK:
log_debug("%s: starting retry event processing", db->name);
run_retry(db);
break;
case PGS_RESULT_OK:
if (PQresultStatus(res) != PGRES_TUPLES_OK)
close_retry(db, 20);
else
parse_retry(db, res);
break;
case PGS_TIMEOUT:
log_debug("%s: retry timeout", db->name);
launch_retry(db);
break;
default:
log_warning("%s: default reconnect", db->name);
pgs_reconnect(db->c_retry, 30);
}
}
void launch_retry(struct PgDatabase *db)
{
const char *cstr;
if (db->c_retry) {
log_debug("%s: retry already initialized", db->name);
} else {
log_debug("%s: launch_retry", db->name);
cstr = make_connstr(db->name);
db->c_retry = pgs_create(cstr, retry_handler, db);
}
pgs_connect(db->c_retry);
}
|