1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
/*-------------------------------------------------------------------------
*
* replication.c
* Controlling the initialization and end of replication process of GTM data
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2010-2012 Postgres-XC Development Group
*
*
* IDENTIFICATION
* src/gtm/recovery/replication.c
*
*-------------------------------------------------------------------------
*/
#include "gtm/replication.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include "gtm/gtm_c.h"
#include "gtm/gtm.h"
#include "gtm/gtm_txn.h"
#include "gtm/standby_utils.h"
#include "gtm/gtm_standby.h"
#include "gtm/register.h"
#include "gtm/assert.h"
#include <stdio.h>
#include "gtm/libpq.h"
#include "gtm/pqformat.h"
#include "gtm/gtm_msg.h"
#include "gtm/gtm_ip.h"
/*
* Process MSG_NODE_BEGIN_REPLICATION_INIT
*/
void
ProcessBeginReplicationInitialSyncRequest(Port *myport, StringInfo message)
{
StringInfoData buf;
MemoryContext oldContext;
pq_getmsgend(message);
if (Recovery_IsStandby())
ereport(ERROR,
(EPERM,
errmsg("Operation not permitted under the standby mode.")));
oldContext = MemoryContextSwitchTo(TopMemoryContext);
/* Acquire global locks to copy resource data to the standby. */
GTM_RWLockAcquire(>MTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE);
GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
elog(DEBUG1, "Prepared for copying data with holding XidGenLock and TransArrayLock.");
MemoryContextSwitchTo(oldContext);
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, NODE_BEGIN_REPLICATION_INIT_RESULT, 4);
if (myport->remote_type == GTM_NODE_GTM_PROXY)
{
GTM_ProxyMsgHeader proxyhdr;
proxyhdr.ph_conid = myport->conn_id;
pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
}
pq_endmessage(myport, &buf);
/*
* Beause this command comes from the standby, we don't have to flush
* messages to the standby here.
*/
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
elog(DEBUG1, "ProcessBeginReplicationInitialSyncRequest() ok.");
return;
}
/*
* Process MSG_NODE_END_REPLICATION_INIT
*/
void
ProcessEndReplicationInitialSyncRequest(Port *myport, StringInfo message)
{
StringInfoData buf;
MemoryContext oldContext;
pq_getmsgend(message);
if (Recovery_IsStandby())
ereport(ERROR,
(EPERM,
errmsg("Operation not permitted under the standby mode.")));
oldContext = MemoryContextSwitchTo(TopMemoryContext);
/*
* Release global locks after copying resource data to the standby.
*/
GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
GTM_RWLockRelease(>MTransactions.gt_XidGenLock);
elog(DEBUG1, "XidGenLock and TransArrayLock released.");
MemoryContextSwitchTo(oldContext);
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, NODE_END_REPLICATION_INIT_RESULT, 4);
if (myport->remote_type == GTM_NODE_GTM_PROXY)
{
GTM_ProxyMsgHeader proxyhdr;
proxyhdr.ph_conid = myport->conn_id;
pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
}
pq_endmessage(myport, &buf);
/*
* Beause this command comes from the standby, we don't have to flush
* messages to the standby here.
*/
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
elog(DEBUG1, "ProcessEndReplicationInitialSyncRequest() ok.");
return;
}
|