Zookeeper 源码解析——客户端网络通信

一、概述

  在前面的博文中我们已经分析了 Zookeeper 中的 Zab 选举部分的代码实现,在这篇博文中我们将再通过源码分析一下 Zookeeper 客户端的网络通信实现。

  博客内所有文章均为 原创,所有示意图均为 原创,若转载请附原文链接。


二、 Zookeeper 中的 RPC 网络数据结构

2.1 协议数据结构

  Zookeeper 中的 RPC 网络协议数据结构概述即包括三个部分:

  1. 起始的 4 byte(int)用于记录实际数据长度(后接实际数据);
  2. RequestHeader 请求头 / ResponseHeader 响应头
  3. Request 请求体 / Response 响应体

  且 RequestHeader 主要包括 xidtype 两部分,其中 xid 代表请求的顺序号,用于保证请求的顺序发送和接收,而 type 代表请求的类型;而 ResponseHeader 主要包括 xidzxid 以及 err ,其中 xid 的作用与 RequestHeader 中的 xid 作用相同,zxid 表示分布事务 id ,而 err 为记录相关的错误信息的错误码。


2.2 核心数据结构 Packet

static class Packet {
   
	RequestHeader requestHeader;	// 请求头信息
	ReplyHeader replyHeader;		// 响应头信息

	Record request;		// 请求数据
	Record response;	// 响应数据

	AsyncCallback cb;	// 异步回调
    Object ctx;			// 异步回调所需使用的 context

	String clientPath;	// 客户端路径视图
    String serverPath;	// 服务器的路径视图
	boolean finished;	// 是否已经处理完成
    
    ByteBuffer bb;		
    public boolean readOnly;
    WatchRegistration watchRegistration;
    WatchDeregistration watchDeregistration;
	
	// 省略方法逻辑..
}

三、核心源码解析

3.1 建立 Netty 网络连接

// Zookeeper.java
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, 
					HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
   
        
    // 创建连接管理器
	cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, 
				this, watchManager, getClientCnxnSocket(), canBeReadOnly);
	cnxn.start();
}

  首先当我们建立一个 Zookeeper 客户端时需要创建一个 Zookeeper 对象,且在这个 Zookeeper 对象创建的过程中会创建一个客户端连接管理器(ClientCnxn),接着在创建 ClientCnxn 的过程中又需要创建一个 ClientCnxnSocket 用于实现客户端间的通信,所以我们跟进这个 getClientCnxnSocket 方法。

// Zookeeper.java
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
   
	// 从配置文件中获取 ClientCnxnSocket 配置信息
	String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
	// 如果配置文件中没有提供 ClientCnxnSocket 配置信息则默认使用 NIO
	if (clientCnxnSocketName == null) {
   
		clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
	}
	try {
   
		// 通过反射获取 ClientCnxnSocket 的构造方法
		Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
		// 通过以客户端配置为入参调用构造方法来创建一个 ClientCnxnSocket 实例
		ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
		// 将创建完成的 ClientCnxnSocket 实例返回
		return clientCxnSocket;
	}
}

  在这个 getClientCnxnSocket 方法中会选择 ClientCnxnSocket 的实现版本,目前的 Zookeeper 中存在两个实现版本,一个是使用 Java JDK 中的 NIO 实现的 ClientCnxnSocketNIO ,另一个是使用 Netty 实现的 ClientCnxnSocketNetty ,而选择的方式优先根据配置文件中的配置进行选择,如果没有进行配置则默认选择 ClientCnxnSocketNIO 实现版本,之后再通过 反射 的方式创建其实例对象。

// ClientCnxnSocketNetty.java
ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
   
	this.clientConfig = clientConfig;
	
	// 创建一个 eventLoopGroup 用于后面对异步请求的处理
	// 且因为客户端只有一个 outgoing Socket 因此只需要一个 eventLoopGroup 即可
	eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads */);
	initProperties();
}

public static EventLoopGroup newNioOrEpollEventLoopGroup(int nThreads) {
   
	// 如果 Epoll 可用( Linux )则优先使用 EpollEventLoopGroup 否则使用 NioEventLoopGroup
	if (Epoll.isAvailable()) {
   
		return new EpollEventLoopGroup(nThreads);
	} else {
   
		return new NioEventLoopGroup(nThreads);
	}
}

  我们这里的分析以 Netty 实现为准,所以选择 ClientCnxnSocketNetty 实现版本,在 ClientCnxnSocketNetty 的构造方法中会选择具体的 EventLoopGroup 的实现,如果是在 Linux 优先选择使用性能更高的 EpollEventLoopGroup 实现,且这里配置的线程数目为一,因此这是典型的 单线程 Reactor 实现。

// Zookeeper.java
protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {
   
	// 将刚刚创建的 clientCnxnSocket 实例作为入参创建一个 ClientCnxn 对象
	return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);
}

// ClientCnxn.java
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
   
	// 省略属性初始化...

	// 将刚刚创建的 clientCnxnSocket 实例作为入参创建一个 SendThread 实例
	// 这里的 SendThread 和 EventThread 均为 ClientCnxn 的内部类且本质均为一个线程
	sendThread = new SendThread(clientCnxnSocket);
	eventThread = new EventThread();
	this.clientConfig = zooKeeper.getClientConfig();
	initRequestTimeout();
}

  看完 getClientCnxnSocket 方法后我们再回头去看 Zookeeper 构造方法中的 createConnection 方法,可以看到该方法的实质就是创建了一个 ClientCnxn 对象,并在 ClientCnxn 的构造方法中创建了 SendThread 发送线程和 EventThread 事件处理线程。

// ClientCnxn.java
// 该方法在 Zookeeper 构造方法中被调用
public void start() {
   
	// 启动 sendThread 和 eventThread 既调用 SendThread 和 EventThread 线程的 run 方法
	sendThread.start();
    eventThread.start();
}

  当完成 SendThread 和 EventThread 这两个线程的创建和初始化后,在 Zookeeper 的构造方法中最后会通过 cnxn.start() 方法启动这两个线程。

// ClientCnxn.SendThread.java
public void run() {
   
	while (state.isAlive()) {
   
		try {
   
			// 当前尚未与服务端建立连接
			if (!clientCnxnSocket.isConnected()) {
   
         		// 如果正在关闭则不尝试重连
				if (closing) {
   
					break;
				}
				// 如果存在之前通过 pingRwServer 方法搜索到的可用服务器地址 rwServerAddress 则优先使用它尝试重连
				if (rwServerAddress != null) {
   
					serverAddress = rwServerAddress;
					rwServerAddress = null;
				} else {
   
					// 如果不存在 rwServerAddress 则直接更换服务器地址后尝试重连
					serverAddress = hostProvider.next(1000);
				}
				// 传入服务器地址建立连接
				startConnect(serverAddress);
				clientCnxnSocket.updateLastSendAndHeard();
			}
			// 省略已连接逻辑...
		}
	}
}

// ClientCnxn.SendThread.java
private void startConnect(InetSocketAddress addr) throws IOException {
   
    if(!isFirstConnect){
   
	    try {
   
	    	// 如果不是第一次连接则先让线程睡眠 1000ms 以内的随机时间,防止短时间内过快的不断重连
			Thread.sleep(r.nextInt(1000));
		} 
	}
	// 设置状态为 CONNECTING
	state = States.CONNECTING;
	// 调用 ClientCnxnSocket 的 connect 方法尝试连接
	clientCnxnSocket.connect(addr);
}

  在 SendThread 的 run 方法中会启动初始化连接的流程,并且最终会调用到 ClientCnxnSocketNetty 的 connect 方法来建立客户端网络通信的连接,而 connect 方法中的代码逻辑注释已经描述的比较清楚,所以不做赘述。

// ClientCnxnSocketNetty.java
void connect(InetSocketAddress addr) throws IOException {
   
	firstConnect = new CountDownLatch(1);

	// 初始化 Netty 逻辑
	Bootstrap bootstrap = new Bootstrap()
			.group(eventLoopGroup)	// 设置 eventLoopGroup
            .channel(NettyUtils.nioOrEpollSocketChannel(
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值