转载地址:http://www.blogjava.net/alex-zheng/articles/339908.html
在看完了server端的启动,再来看client端的启动过程是怎么进行的。例子是TelentServer对应的TelentClient
直接看connect方法
public
ChannelFuture connect(
final
SocketAddress remoteAddress,
final
SocketAddress localAddress) {
if (remoteAddress == null ) {
throw new NullPointerException( " remoteAddress " );
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException( " Failed to initialize a pipeline. " , e);
}
// Set the options.
// NioClientSocketChannel构造函数中会触发channelopen
// TelnetClientPipelineFactory中的upstreamhandler没有重写channelOpen,这里只是一直往下传递该事件
Channel ch = getFactory().newChannel(pipeline);
ch.getConfig().setOptions(getOptions());
// Bind.
if (localAddress != null ) {
ch.bind(localAddress);
}
// Connect.
return ch.connect(remoteAddress);
}
然后执行ch.connect(remoteAddress);
if (remoteAddress == null ) {
throw new NullPointerException( " remoteAddress " );
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException( " Failed to initialize a pipeline. " , e);
}
// Set the options.
// NioClientSocketChannel构造函数中会触发channelopen
// TelnetClientPipelineFactory中的upstreamhandler没有重写channelOpen,这里只是一直往下传递该事件
Channel ch = getFactory().newChannel(pipeline);
ch.getConfig().setOptions(getOptions());
// Bind.
if (localAddress != null ) {
ch.bind(localAddress);
}
// Connect.
return ch.connect(remoteAddress);
}
这里是NioClientSocketChannel-->NioSocketChannel-->AbstractChannel
public
ChannelFuture connect(SocketAddress remoteAddress) {
return Channels.connect( this , remoteAddress);
}
public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
if (remoteAddress == null ) {
throw new NullPointerException( " remoteAddress " );
}
ChannelFuture future = future(channel, true );
channel.getPipeline().sendDownstream( new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
}
return Channels.connect( this , remoteAddress);
}
public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
if (remoteAddress == null ) {
throw new NullPointerException( " remoteAddress " );
}
ChannelFuture future = future(channel, true );
channel.getPipeline().sendDownstream( new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
}
从TelnetClientPipelineFactory的pipeline中由下往上传递CONNECTED事件,这里只有一个StringEncoder-->OneToOneEncoder,其
handleDownstream方法对该事件不做处理,往上传递该事件,执行DefaultChannelHandlerContext.sendDownstream
public
void
sendDownstream(ChannelEvent e) {
// 在StringEncoder之前再没有downstreamhandler
DefaultChannelHandlerContext prev = getActualDownstreamContext( this .prev);
if (prev == null ) {
try {
getSink().eventSunk(DefaultChannelPipeline. this , e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline. this .sendDownstream(prev, e);
}
}
执行NioClientSocketPipelineSink.eventSunk,其中会执行
// 在StringEncoder之前再没有downstreamhandler
DefaultChannelHandlerContext prev = getActualDownstreamContext( this .prev);
if (prev == null ) {
try {
getSink().eventSunk(DefaultChannelPipeline. this , e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline. this .sendDownstream(prev, e);
}
}
private
void
connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
// 如果返回true,调用nioworker.register,开始启动nioworker线程处理该channel的读写
// 否则,交给boss.register方法,在boss线程中完成连接
if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, cf);
} else {
// 为当前clientsocketchannel添加closed的listener
channel.getCloseFuture().addListener( new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if ( ! cf.isDone()) {
cf.setFailure( new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
boss.register(channel);
}
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel));
}
}
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
// 如果返回true,调用nioworker.register,开始启动nioworker线程处理该channel的读写
// 否则,交给boss.register方法,在boss线程中完成连接
if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, cf);
} else {
// 为当前clientsocketchannel添加closed的listener
channel.getCloseFuture().addListener( new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if ( ! cf.isDone()) {
cf.setFailure( new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
boss.register(channel);
}
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel));
}
}
执行boss.register,在boss线程中确保该channel连接成功,这里会启动boss线程
void
register(NioClientSocketChannel channel) {
// 在RegisterTask的run方法里注册SelectionKey.OP_CONNECT
Runnable registerTask = new RegisterTask( this , channel);

boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet( false , true )) {
selector.wakeup();
}
}
最后启动boss.run,其中processSelectedKeys里执行connect
// 在RegisterTask的run方法里注册SelectionKey.OP_CONNECT
Runnable registerTask = new RegisterTask( this , channel);

boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet( false , true )) {
selector.wakeup();
}
}
private
void
connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.socket.finishConnect()) {
k.cancel();
// 连接成功,才在nioworker中启动一个新线程来处理该socketchannel的读写
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
ch.worker.close(ch, succeededFuture(ch));
}
}
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.socket.finishConnect()) {
k.cancel();
// 连接成功,才在nioworker中启动一个新线程来处理该socketchannel的读写
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
ch.worker.close(ch, succeededFuture(ch));
}
}
之后就是交给nioworker线程来进行数据的发送和接收了。