Netty-自定义线程池处理长耗时业务

在Netty中,由于I/O线程与EventLoop绑定,长耗时业务会导致阻塞问题。为解决此问题,文章介绍了使用Guava自定义线程池的步骤:引入Guava依赖,定义业务线程池并设置参数,确保线程为守护线程。通过在Handler中提交业务到线程池执行,避免影响I/O操作。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

为什么需要自定义线程处理

在Netty中,EventLoop 和 EventExecutorGroup 提供的普通任务不能解决长耗时业务处理引起的 I/O 线程阻塞问题,究其原因:在 Netty 中,一个 channel 在它的生命中期内只注册于一个 EventLoop,而一个 EventLoop 在它的生命周期内只和一个线程绑定,一个给定的 channel 的 I/O 操作都是由相同的线程进行处理。为此不能直接在该线程上进行长耗时业务处理。为了解决这个问题,通常的做法是:自定义线程池,把长耗时的业务都丢到线程池中去处理。

Guava 自定义线程池处理业务

基本实现思路如下:

  1. 引入 Guava 依赖,Guava是谷歌推出的基于开源的 Java 库,是谷歌很多项目的核心库,该库是为了增强 Java
    的功能和处理能力,我们利用它来实现我们的业务线程池;
  2. 在 Handler 中定义业务线程池,我们利用 Guava 创建线程池,这里面包含了线程启动的方式,线程池必要参数的设置,注意这里一定要设置线程为守护线程而不是用户线程,要不然麻烦多多,守护线程和用户线程的主要区别:主线程结束后用户线程还会继续运行,守护线程则会自动结束;
  3. 耗时业务放入线程池:用法相当简单:service.submit (我们要执行的业务处理),和我们前面讲到的普通任务和定时任务的放置完全一样。

添加依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>

编码实现

服务端实现
package org.skystep.tcpserv;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
   
    public static void main(String[] args) {
   
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        try {
   
            //用于启动NIO服务
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //通过工厂方法设计模式实例化一个channel
            serverBootstrap.group(bossGroup, wokerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer());
            //绑定服务器,该实例将提供有关IO操作的结果或状态的信息
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            System.out.println("在" + channelFuture.channel().localAddress(
### 如何在 Netty 中配置和使用线程池 #### 配置 IO 和业务线程池分离 为了提高性能并防止耗时操作影响到 I/O 操作,建议将负责网络读写的 `Worker` 线程组与执行具体应用逻辑的工作线程区分开来。通过这种方式能够确保即使某些请求处理时间较也不会阻碍其他连接上的数据交换过程[^1]。 ```java // 创建Boss Group用于接受新的连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // Worker Group用来处理已经被建立的连接中的事件 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 将业务处理器绑定至不同的线程池中运行 ch.pipeline().addLast("decoder", new StringDecoder()); ch.pipeline().addLast("encoder", new StringEncoder()); // 使用默认的业务线程池自定义线程池 EventExecutorGroup businessThreadPool = new DefaultEventExecutorGroup(16); ch.pipeline().addLast(businessThreadPool, "businessHandler", new BusinessLogicHandler()); } }); } finally { // 关闭资源... } ``` #### 利用 `ChannelPipeline.addLast()` 方法指定线程池 当向管道添加处理器时可以通过传递一个实现了 `EventExecutorGroup` 接口的对象作为参数给 `addLast()` 函数从而指明该处理器应当在哪种类型的线程上被执行。这样做的好处是可以灵活控制不同阶段的任务分配策略以及更好地管理并发度[^3]。 ```java // 假设有一个已经初始化好的ChannelPipeline实例pipeline // 定义一个新的线程池专门用于处理某种特定类型的业务消息 EventExecutorGroup specialBusinessPool = new DefaultEventExecutorGroup(8); // 添加解码器和其他必要的预处理器 pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(MyMessage.getDefaultInstance())); // 把实际的业务处理器加入到由specialBusinessPool所代表的一组工作线程当中去 pipeline.addLast(specialBusinessPool, "mySpecialHandler", new MySpecialHandler()); // 后续还可以继续添加其他的组件比如编码器之类的 pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); ```
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值