【Netty 源码】服务端启动流程源码分析 篇一

【Netty 源码】服务端启动流程源码分析 篇一

1.原生Java NIO服务端创建流程

使用Java NIO创建服务端时,通常我们需要先创建Channel,Selector两个对象,然后将Channel绑定端口并注册到Selector上,最后对事件轮询监听

        //第一步:创建Channel
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.configureBlocking(false);
        //第二步:创建selector
        Selector selector = Selector.open();
        //第三步:将Channel注册到selector
        SelectionKey selectionKey = channel.register(selector, 0, new Object());
        //第四步:Channel监听端口
        channel.bind(new InetSocketAddress(8080));
        //第五步:关注感兴趣的事件
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);

2.Netty 服务端创建流程

Netty的服务端创建流程都在 ServerBootstrap.bind方法中完成

private ChannelFuture doBind(final SocketAddress localAddress) {
    //创建 channel并初始化
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    //不能肯定register完成,因为register是丢到nio event loop里面执行去了。
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        //进一步进行绑定操作(此处的绑定指的是将 channel 绑定 selector)
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        //等着register完成后再通知再执行bind
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

2.1 initAndRegister() 初始化Channel并将Channel注册到Selector

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
        		//通过工厂类创建Channel
            channel = channelFactory.newChannel();
            //完成Channel初始化
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //开始register
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

通过堆栈信息可以看到此方法由main线程进行调用

image-20240310111903103

channelFactory.newChannel()通过DEBUG追踪,调用的 io.netty.channel.ReflectiveChannelFactory#newChannel,底层通过反射无参构造,创建的Channel

    public T newChannel() {
        try {
            //反射创建channel
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

·ChannelFactory 是一个接口类,只有一个抽象方法 newChannel() ,在ServerBootstrap.channel 方法执行时,赋值

io.netty.bootstrap.AbstractBootstrap#channel

image-20240310113115988

io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)

image-20240310113130755

`NioServerSocketChannel 无参构造被调用的时候会执行 newSocket 方法

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

参数 DEFAULT_SELECTOR_PROVIDER 被 SelectorProvider.provider() 赋值,不同的平台下 SelectorProvider 实现类不一样

最终通过 provider.openServerSocketChannel() 创建一个Channel

io.netty.channel.socket.nio.NioServerSocketChannel#newSocket

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

image-20240310152014731

可以看到这里Netty的做法跟原生的Java NIO 是一样的

2.1.1 init() 初始化Channel,并添加ServerBootstrapAcceptor 处理器
    @Override
    void init(Channel channel) {
        //参数配置,后面再看 此时跳过·
        setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
        setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

        //此时Channel已经创建出来了,拿到pipeline,准备添加Handler
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions =
                childOptions.entrySet().toArray(newOptionArray(0));
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        //ChannelInitializer一次性、初始化handler:
        //负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除了:
        //ServerBootstrapAcceptor handler: 负责接收客户端连接创建连接后,对连接的初始化工作。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //将ServerBootstrap.Handler()方法设置的Handler添加到pipeline中
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                //获取一个EventLoop,并提交往pipeline中添加ServerBootstrapAcceptor Handler的任务
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

·ServerBootstrapAcceptor 继承自 ChannelInboundHandlerAdapter ,ServerBootstrapAcceptor作用就是当发生连接事件时与Channel建立连接。

需要注意的时,此时只是往pipeline中添加了一个Handler,并没有真正执行。

2.1.2 register(channel) 将Channel注册到EventLoop
ChannelFuture regFuture = config().group().register(channel);

io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)

  promise.channel().unsafe().register(this, promise);

io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    //此时还是main线程,因此走false逻辑
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            //将任务交由EventLoop执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
2.2.3 register0()真正执行Channel注册

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        //将Channel注册到Selector上
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
         //server socket的注册不会走进下面if,server socket接受连接创建的socket可以走进去。因为accept后就active了。
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

doRegister 方法将调用父类AbstractNioChannel

io.netty.channel.nio.AbstractNioChannel#doRegister

 protected void doRegister() throws Exception {
        boolean selected = false;
        //死循环注册Channel,0没有绑定事件,并将Channel作为附件,方便后续取出使用
        for (;;) {
            try {
                logger.info("initial register: " + 0);
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

此处是一个死循环,确保一定会注册上 并且将Channel作为一个附件,方便取出使用。每个eventLoop都维护了一个Selector。

回到register0方法中,在将Channel注册到Selector上后,执行pipeline.invokeHandlerAddedIfNeeded();

io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }

    // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
    // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
    // the EventLoop.
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

此时将调用 init()方法中,往pipeline中添加的ChannelInitializer.initChannel()。由此可知,ChannelInitializer.initChannel 只会被调用一次,就是在Channel实例化后,成功注册到Selector上。

image-20240310160635710

此时Channel已经注册到Selector上了,因此重新获取一个EventLoop后,提交任务往pipeline中添加ServerBootstrapAcceptor Handler,负责Channel建立连接事件

在回到 register0 方法,执行完Handler后,将调用 safeSetSuccess(promise) 放promise中设置结果,进而触发promise的监听器,执行doBind0()方法

io.netty.bootstrap.AbstractBootstrap#doBind image-20240310163800134

io.netty.bootstrap.AbstractBootstrap#doBind0

image-20240310163920293

将任务交给EventLoop执行

io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

每个pipeline默认都有tail和head两个Handler,此时的bind方法将交由tail执行

io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    //任务交由了EventLoop执行,因此走true逻辑
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

invokeBind 方法调用链比较深,最终走到io.netty.channel.socket.nio.NioServerSocketChannel#doBind

image-20240310164350164

`ServerSocketChannel继承自父类ServerSocketChannel,拿到的就是java的ServerSocketChannel,如果java版本大于7,最终通过ServerSocketChannelImpl.bind方法绑定端口号,并设置是否阻塞

执行doBind方法后,AbstractUnsafe将给EventLoop提交pipeline.fireChannelActive 任务。

io.netty.channel.AbstractChannel.AbstractUnsafe#bind

image-20240310164820325

此时的pipeline中有三个Handler,分别是head,tail,acceptor。fireChannelActive 方法将调用他们的channelActive方法。

我们主要看 head.channelActive

io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

image-20240310165233107

调用链比较深,最终会回到AbstractNioChannel

io.netty.channel.nio.AbstractNioChannel#doBeginRead

Netty 源码学习

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

秋日的晚霞

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值