summaryrefslogtreecommitdiff
path: root/src/gtm/recovery/replication.c
blob: bc04b191db8ed905866667f36ea2866e72de3714 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*-------------------------------------------------------------------------
 *
 * replication.c
 *  Controlling the initialization and end of replication process of GTM data
 *
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 * Portions Copyright (c) 2010-2012 Postgres-XC Development Group
 *
 *
 * IDENTIFICATION
 *	  src/gtm/recovery/replication.c
 *
 *-------------------------------------------------------------------------
 */
#include "gtm/replication.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>

#include "gtm/gtm_c.h"
#include "gtm/gtm.h"
#include "gtm/gtm_txn.h"
#include "gtm/standby_utils.h"
#include "gtm/gtm_standby.h"
#include "gtm/register.h"
#include "gtm/assert.h"
#include <stdio.h>
#include "gtm/libpq.h"
#include "gtm/pqformat.h"
#include "gtm/gtm_msg.h"
#include "gtm/gtm_ip.h"

/*
 * Process MSG_NODE_BEGIN_REPLICATION_INIT
 */
void
ProcessBeginReplicationInitialSyncRequest(Port *myport, StringInfo message)
{
	StringInfoData buf;
	MemoryContext oldContext;

	pq_getmsgend(message);

	if (Recovery_IsStandby())
		ereport(ERROR,
			(EPERM,
			 errmsg("Operation not permitted under the standby mode.")));

	oldContext = MemoryContextSwitchTo(TopMemoryContext);

	/* Acquire global locks to copy resource data to the standby. */
	GTM_RWLockAcquire(&GTMTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE);
	GTM_RWLockAcquire(&GTMTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
	elog(DEBUG1, "Prepared for copying data with holding XidGenLock and TransArrayLock.");

	MemoryContextSwitchTo(oldContext);

	pq_beginmessage(&buf, 'S');
	pq_sendint(&buf, NODE_BEGIN_REPLICATION_INIT_RESULT, 4);
	if (myport->remote_type == GTM_NODE_GTM_PROXY)
	{
		GTM_ProxyMsgHeader proxyhdr;
		proxyhdr.ph_conid = myport->conn_id;
		pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
	}
	pq_endmessage(myport, &buf);

	/*
	 * Beause this command comes from the standby, we don't have to flush
	 * messages to the standby here.
	 */
	if (myport->remote_type != GTM_NODE_GTM_PROXY)
		pq_flush(myport);

	elog(DEBUG1, "ProcessBeginReplicationInitialSyncRequest() ok.");

	return;
}

/*
 * Process MSG_NODE_END_REPLICATION_INIT
 */
void
ProcessEndReplicationInitialSyncRequest(Port *myport, StringInfo message)
{
	StringInfoData buf;
	MemoryContext oldContext;

	pq_getmsgend(message);

	if (Recovery_IsStandby())
		ereport(ERROR,
			(EPERM,
			 errmsg("Operation not permitted under the standby mode.")));

	oldContext = MemoryContextSwitchTo(TopMemoryContext);

	/*
	 * Release global locks after copying resource data to the standby.
	 */
	GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
	GTM_RWLockRelease(&GTMTransactions.gt_XidGenLock);
	elog(DEBUG1, "XidGenLock and TransArrayLock released.");

	MemoryContextSwitchTo(oldContext);

	pq_beginmessage(&buf, 'S');
	pq_sendint(&buf, NODE_END_REPLICATION_INIT_RESULT, 4);
	if (myport->remote_type == GTM_NODE_GTM_PROXY)
	{
		GTM_ProxyMsgHeader proxyhdr;
		proxyhdr.ph_conid = myport->conn_id;
		pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
	}
	pq_endmessage(myport, &buf);

	/*
	 * Beause this command comes from the standby, we don't have to flush
	 * messages to the standby here.
	 */
	if (myport->remote_type != GTM_NODE_GTM_PROXY)
		pq_flush(myport);

	elog(DEBUG1, "ProcessEndReplicationInitialSyncRequest() ok.");

	return;
}