一、概述
在前面的博文中我们已经分析了 Zookeeper 中的 Zab 选举部分的代码实现,在这篇博文中我们将再通过源码分析一下 Zookeeper 客户端的网络通信实现。
博客内所有文章均为 原创,所有示意图均为 原创,若转载请附原文链接。
二、 Zookeeper 中的 RPC 网络数据结构
2.1 协议数据结构
Zookeeper 中的 RPC 网络协议数据结构概述即包括三个部分:
- 起始的 4 byte(int)用于记录实际数据长度(后接实际数据);
- RequestHeader 请求头 / ResponseHeader 响应头
- Request 请求体 / Response 响应体
且 RequestHeader 主要包括 xid 和 type 两部分,其中 xid 代表请求的顺序号,用于保证请求的顺序发送和接收,而 type 代表请求的类型;而 ResponseHeader 主要包括 xid 和 zxid 以及 err ,其中 xid 的作用与 RequestHeader 中的 xid 作用相同,zxid 表示分布事务 id ,而 err 为记录相关的错误信息的错误码。
2.2 核心数据结构 Packet
static class Packet {
RequestHeader requestHeader; // 请求头信息
ReplyHeader replyHeader; // 响应头信息
Record request; // 请求数据
Record response; // 响应数据
AsyncCallback cb; // 异步回调
Object ctx; // 异步回调所需使用的 context
String clientPath; // 客户端路径视图
String serverPath; // 服务器的路径视图
boolean finished; // 是否已经处理完成
ByteBuffer bb;
public boolean readOnly;
WatchRegistration watchRegistration;
WatchDeregistration watchDeregistration;
// 省略方法逻辑..
}
三、核心源码解析
3.1 建立 Netty 网络连接
// Zookeeper.java
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly,
HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
// 创建连接管理器
cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout,
this, watchManager, getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
首先当我们建立一个 Zookeeper 客户端时需要创建一个 Zookeeper 对象,且在这个 Zookeeper 对象创建的过程中会创建一个客户端连接管理器(ClientCnxn),接着在创建 ClientCnxn 的过程中又需要创建一个 ClientCnxnSocket 用于实现客户端间的通信,所以我们跟进这个 getClientCnxnSocket 方法。
// Zookeeper.java
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
// 从配置文件中获取 ClientCnxnSocket 配置信息
String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
// 如果配置文件中没有提供 ClientCnxnSocket 配置信息则默认使用 NIO
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// 通过反射获取 ClientCnxnSocket 的构造方法
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
// 通过以客户端配置为入参调用构造方法来创建一个 ClientCnxnSocket 实例
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
// 将创建完成的 ClientCnxnSocket 实例返回
return clientCxnSocket;
}
}
在这个 getClientCnxnSocket 方法中会选择 ClientCnxnSocket 的实现版本,目前的 Zookeeper 中存在两个实现版本,一个是使用 Java JDK 中的 NIO 实现的 ClientCnxnSocketNIO ,另一个是使用 Netty 实现的 ClientCnxnSocketNetty ,而选择的方式优先根据配置文件中的配置进行选择,如果没有进行配置则默认选择 ClientCnxnSocketNIO 实现版本,之后再通过 反射 的方式创建其实例对象。
// ClientCnxnSocketNetty.java
ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
this.clientConfig = clientConfig;
// 创建一个 eventLoopGroup 用于后面对异步请求的处理
// 且因为客户端只有一个 outgoing Socket 因此只需要一个 eventLoopGroup 即可
eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads */);
initProperties();
}
public static EventLoopGroup newNioOrEpollEventLoopGroup(int nThreads) {
// 如果 Epoll 可用( Linux )则优先使用 EpollEventLoopGroup 否则使用 NioEventLoopGroup
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(nThreads);
} else {
return new NioEventLoopGroup(nThreads);
}
}
我们这里的分析以 Netty 实现为准,所以选择 ClientCnxnSocketNetty 实现版本,在 ClientCnxnSocketNetty 的构造方法中会选择具体的 EventLoopGroup 的实现,如果是在 Linux 优先选择使用性能更高的 EpollEventLoopGroup 实现,且这里配置的线程数目为一,因此这是典型的 单线程 Reactor 实现。
// Zookeeper.java
protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {
// 将刚刚创建的 clientCnxnSocket 实例作为入参创建一个 ClientCnxn 对象
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);
}
// ClientCnxn.java
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
// 省略属性初始化...
// 将刚刚创建的 clientCnxnSocket 实例作为入参创建一个 SendThread 实例
// 这里的 SendThread 和 EventThread 均为 ClientCnxn 的内部类且本质均为一个线程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
看完 getClientCnxnSocket 方法后我们再回头去看 Zookeeper 构造方法中的 createConnection 方法,可以看到该方法的实质就是创建了一个 ClientCnxn 对象,并在 ClientCnxn 的构造方法中创建了 SendThread 发送线程和 EventThread 事件处理线程。
// ClientCnxn.java
// 该方法在 Zookeeper 构造方法中被调用
public void start() {
// 启动 sendThread 和 eventThread 既调用 SendThread 和 EventThread 线程的 run 方法
sendThread.start();
eventThread.start();
}
当完成 SendThread 和 EventThread 这两个线程的创建和初始化后,在 Zookeeper 的构造方法中最后会通过 cnxn.start() 方法启动这两个线程。
// ClientCnxn.SendThread.java
public void run() {
while (state.isAlive()) {
try {
// 当前尚未与服务端建立连接
if (!clientCnxnSocket.isConnected()) {
// 如果正在关闭则不尝试重连
if (closing) {
break;
}
// 如果存在之前通过 pingRwServer 方法搜索到的可用服务器地址 rwServerAddress 则优先使用它尝试重连
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// 如果不存在 rwServerAddress 则直接更换服务器地址后尝试重连
serverAddress = hostProvider.next(1000);
}
// 传入服务器地址建立连接
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
// 省略已连接逻辑...
}
}
}
// ClientCnxn.SendThread.java
private void startConnect(InetSocketAddress addr) throws IOException {
if(!isFirstConnect){
try {
// 如果不是第一次连接则先让线程睡眠 1000ms 以内的随机时间,防止短时间内过快的不断重连
Thread.sleep(r.nextInt(1000));
}
}
// 设置状态为 CONNECTING
state = States.CONNECTING;
// 调用 ClientCnxnSocket 的 connect 方法尝试连接
clientCnxnSocket.connect(addr);
}
在 SendThread 的 run 方法中会启动初始化连接的流程,并且最终会调用到 ClientCnxnSocketNetty 的 connect 方法来建立客户端网络通信的连接,而 connect 方法中的代码逻辑注释已经描述的比较清楚,所以不做赘述。
// ClientCnxnSocketNetty.java
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
// 初始化 Netty 逻辑
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup) // 设置 eventLoopGroup
.channel(NettyUtils.nioOrEpollSocketChannel(