summaryrefslogtreecommitdiff
path: root/src/gtm/main/replication.c
blob: a20acb5cd7fa7d2e5e25f72b5ac57b487557ca33 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*-------------------------------------------------------------------------
 *
 * replication.c
 *  Controlling the initialization and end of replication process of GTM data
 *
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 * Portions Copyright (c) 2010-2012 Postgres-XC Development Group
 *
 *
 * IDENTIFICATION
 *	  src/gtm/recovery/replication.c
 *
 *-------------------------------------------------------------------------
 */
#include "gtm/replication.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>

#include "gtm/gtm_c.h"
#include "gtm/gtm.h"
#include "gtm/gtm_txn.h"
#include "gtm/standby_utils.h"
#include "gtm/gtm_standby.h"
#include "gtm/register.h"
#include "gtm/assert.h"
#include <stdio.h>
#include "gtm/libpq.h"
#include "gtm/pqformat.h"
#include "gtm/gtm_msg.h"
#include "gtm/gtm_ip.h"

/*
 * Process MSG_NODE_BEGIN_REPLICATION_INIT
 */
void
ProcessBeginReplicationInitialSyncRequest(Port *myport, StringInfo message)
{
	StringInfoData buf;
	MemoryContext oldContext;

	pq_getmsgend(message);

	if (Recovery_IsStandby())
		ereport(ERROR,
			(EPERM,
			 errmsg("Operation not permitted under the standby mode.")));

	oldContext = MemoryContextSwitchTo(TopMemoryContext);

	/* Acquire global locks to copy resource data to the standby. */

	/*
	 * XXX Weird locking semantics.. the locks are released in
	 * ProcessEndReplicationInitialSyncRequest()
	 */
	GTM_RWLockAcquire(&GTMTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE);
	GTM_RWLockAcquire(&GTMTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
	elog(LOG, "Prepared for copying data with holding XidGenLock and TransArrayLock.");

	MemoryContextSwitchTo(oldContext);

	pq_beginmessage(&buf, 'S');
	pq_sendint(&buf, NODE_BEGIN_REPLICATION_INIT_RESULT, 4);
	if (myport->remote_type == GTM_NODE_GTM_PROXY)
	{
		GTM_ProxyMsgHeader proxyhdr;
		proxyhdr.ph_conid = myport->conn_id;
		pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
	}
	pq_endmessage(myport, &buf);

	/*
	 * Beause this command comes from the standby, we don't have to flush
	 * messages to the standby here.
	 */
	if (myport->remote_type != GTM_NODE_GTM_PROXY)
		pq_flush(myport);

	elog(LOG, "ProcessBeginReplicationInitialSyncRequest() ok.");

	return;
}

/*
 * Process MSG_NODE_END_REPLICATION_INIT
 */
void
ProcessEndReplicationInitialSyncRequest(Port *myport, StringInfo message)
{
	StringInfoData buf;
	MemoryContext oldContext;

	pq_getmsgend(message);

	if (Recovery_IsStandby())
		ereport(ERROR,
			(EPERM,
			 errmsg("Operation not permitted under the standby mode.")));

	oldContext = MemoryContextSwitchTo(TopMemoryContext);

	/*
	 * Release global locks after copying resource data to the standby.
	 */
	GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
	GTM_RWLockRelease(&GTMTransactions.gt_XidGenLock);
	elog(LOG, "XidGenLock and TransArrayLock released.");

	MemoryContextSwitchTo(oldContext);

	pq_beginmessage(&buf, 'S');
	pq_sendint(&buf, NODE_END_REPLICATION_INIT_RESULT, 4);
	if (myport->remote_type == GTM_NODE_GTM_PROXY)
	{
		GTM_ProxyMsgHeader proxyhdr;
		proxyhdr.ph_conid = myport->conn_id;
		pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
	}
	pq_endmessage(myport, &buf);

	/*
	 * Beause this command comes from the standby, we don't have to flush
	 * messages to the standby here.
	 */
	if (myport->remote_type != GTM_NODE_GTM_PROXY)
		pq_flush(myport);

	elog(LOG, "ProcessEndReplicationInitialSyncRequest() ok.");

	return;
}