springboot整合netty的步骤

springboot整合netty框架

 Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。


那么如何和springboot这个比较流行的框架进行整合呢?Netty与SpringBoot的整合,我想无非就是要整合几个地方

让netty跟springboot生命周期保持一致,同生共死
让netty能用上ioc中的Bean
让netty能读取到全局的配置


首先整个项目引入pom

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
</dependency>

是的,不用声明版本号。因为 spring-boot-dependencies 中已经声明了最新的netty依赖。


配置Netty的配置类

一、通过yaml配置基本的属性

server:
  port: 80

logging:
    level:
      root: DEBUG

management:
  endpoints: 
    web:
      exposure:
        include: "*"
    
  endpoint:
    shutdown:
      enabled: true

netty:
  websocket:
    # Websocket服务端口
    port: 1024
    # 绑定的网卡
    ip: 0.0.0.0
    # 消息帧最大体积
    max-frame-size: 10240
    # URI路径
    path: /channel

通过 ApplicationRunner 启动Websocket服务

二、在Controller里新建NettyBootsrapRunner 类

import java.net.InetSocketAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.springboot.netty.websocket.handler.WebsocketMessageHandler;

/**
 * 初始化Netty服务
 * @author Administrator
 */
@Component
public class NettyBootsrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {

	private static final Logger LOGGER = LoggerFactory.getLogger(NettyBootsrapRunner.class);
	
	@Value("${netty.websocket.port}")
	private int port;

	@Value("${netty.websocket.ip}")
	private String ip;
	
	@Value("${netty.websocket.path}")
	private String path;
	
	@Value("${netty.websocket.max-frame-size}")
	private long maxFrameSize;
	
	private ApplicationContext applicationContext;
	
	private Channel serverChannel;
	
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}
	
	public void run(ApplicationArguments args) throws Exception {
		
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(bossGroup, workerGroup);
			serverBootstrap.channel(NioServerSocketChannel.class);
			serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port));
			serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel socketChannel) throws Exception {
					ChannelPipeline pipeline = socketChannel.pipeline();
					pipeline.addLast(new HttpServerCodec());
					pipeline.addLast(new ChunkedWriteHandler());
					pipeline.addLast(new HttpObjectAggregator(65536));
					pipeline.addLast(new ChannelInboundHandlerAdapter() {
						@Override
						public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
							if(msg instanceof FullHttpRequest) {
								FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
								String uri = fullHttpRequest.uri();
								if (!uri.equals(path)) {
									// 访问的路径不是 websocket的端点地址,响应404
									ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
										.addListener(ChannelFutureListener.CLOSE);
									return ;
								}
							}
							super.channelRead(ctx, msg);
						}
					});
					pipeline.addLast(new WebSocketServerCompressionHandler());
					pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));

					/**
					 * 从IOC中获取到Handler
					 */
					pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class));
				}
			});
			Channel channel = serverBootstrap.bind().sync().channel();	
			this.serverChannel = channel;
			LOGGER.info("websocket 服务启动,ip={},port={}", this.ip, this.port);
			channel.closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public void onApplicationEvent(ContextClosedEvent event) {
		if (this.serverChannel != null) {
			this.serverChannel.close();
		}
		LOGGER.info("websocket 服务停止");
	}
}



NettyBootsrapRunner 实现了 ApplicationRunner, ApplicationListener, ApplicationContextAware 接口。

这样一来,NettyBootsrapRunner 可以在App的启动和关闭时执行Websocket服务的启动和关闭。而且通过 ApplicationContextAware 还能获取到 ApplicationContext

1.通过IOC管理 Netty 的Handler

WebsocketMessageHandler

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.springboot.netty.service.DiscardService;
/**
 * 
 * @author Administrator
 *
 */
@Sharable
@Component
public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class);
	
	@Autowired
	DiscardService discardService;
	
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
		if (msg instanceof TextWebSocketFrame) {
			TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
			// 业务层处理数据
			this.discardService.discard(textWebSocketFrame.text());
			// 响应客户端
			ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
		} else {
			// 不接受文本以外的数据帧类型
			ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
		}
	}
	
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		super.channelInactive(ctx);
		LOGGER.info("链接断开:{}", ctx.channel().remoteAddress());
	}
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		super.channelActive(ctx);
		LOGGER.info("链接创建:{}", ctx.channel().remoteAddress());
	}
}

handler已经是一个IOC管理的Bean,可以自由的使用依赖注入等Spring带来的快捷功能。由于是单例存在,所有的链接都使用同一个hander,所以尽量不要保存任何实例变量。

这个Handler处理完毕客户端的消息后,给客户端会响应一条:“我收到了你的消息:” + System.currentTimeMillis() 的消息

为了演示在Handler中使用业务层,这里假装注入了一个 DiscardService 服务。它的逻辑很简单,就是丢弃消息

2.DiscardService 服务

代码如下(示例):

public void discard (String message) {
	LOGGER.info("丢弃消息:{}", message);
}

演示

启动客户端

<!DOCTYPE html>
	<html>
	<head>
		<meta charset="UTF-8">
		<title>Websocket</title>
	</head>
	<body>
	
	</body>
	<script type="text/javascript">
		;(function(){
			const websocket = new WebSocket('ws://localhost:1024/channel');
			websocket.onmessage = e => {
				console.log('收到消息:', e.data);
			}
			websocket.onclose = e => {
				let {code, reason} = e;
				console.log(`链接断开:code=${code}, reason=${reason}`);
			}
			websocket.onopen = () => {
				console.log(`链接建立...`);
				websocket.send('Hello');
			}
			websocket.onerror = e => {
				console.log('链接异常:', e);
			}
		})();

	</script>
</html>




链接创建后就给服务端发送一条消息:Hello。Netty会在SpringBoot App启动后启动,App停止后关闭,可以正常的对外提供服务 并且Handler交给IOC管理可以注入Service,完成业务处理。

### Spring Boot 整合 Netty 和 WebSocket 实现方案 #### 1. 添加依赖项 要在Spring Boot项目中集成Netty和WebSocket,首先需要在`pom.xml`文件中添加必要的依赖项。这通常包括Spring WebFlux、Netty以及WebSocket支持的相关库。 ```xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> ``` 这些依赖项提供了构建基于反应式的Web应用程序所需的基础结构和支持[^4]。 #### 2. 创建WebSocket配置类 创建一个Java配置类来定义WebSocket服务器的行为。通过继承`WebSocketServerEndpointConfigurer`并重写相应的方法可以自定义握手处理器和其他设置。 ```java @Configuration public class WebSocketConfig implements WebSocketServerCustomizer { @Override public void customize(ReactiveWebSocketService service) { // 自定义WebSocket服务行为 } } ``` 此部分代码用于初始化WebSocket的服务端逻辑,并允许开发者根据需求调整其工作方式[^2]。 #### 3. 编写处理程序 编写具体的WebSocket事件处理程序,负责监听来自客户端的消息并与之交互。这里展示了一个简单的例子: ```java @Component @ServerEndpoint("/ws/{userId}") public class MyWebSocketHandler extends TextWebSocketHandler { private final Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class); @Autowired private PushMsgService pushMsgService; @OnOpen public void handleConnection(ServerHttpRequest request, ServerHttpResponse response, WebSocketSession session, EndpointConfig config) throws IOException { String userId = (String)session.getAttributes().get("USER_ID"); this.pushMsgService.addUser(userId, session); logger.info("New connection from user {}", userId); } @OnMessage public void handleMessage(TextMessage message, WebSocketSession session) throws IOException { // 处理接收到的信息 } @OnError public void handleError(Throwable error){ // 错误处理机制 } @OnClose public void handleClose(CloseReason reason, WebSocketSession session)throws Exception{ // 断开连接后的清理操作 } } ``` 上述代码片段展示了如何利用注解驱动的方式捕获不同类型的WebSocket生命周期事件,并执行相应的业务逻辑[^5]。 #### 4. 构建消息推送服务 为了能够向已建立的WebSocket会话发送通知或更新数据给前端页面,还需要开发一套有效的消息广播系统。下面给出了一种可能的设计模式——即封装好的`PushMsgService`接口及其默认实现: ```java @Service public class SimplePushMsgServiceImpl implements PushMsgService { private Map<String, List<WebSocketSession>> usersSessionsMap = new ConcurrentHashMap<>(); @Override public synchronized void addUser(String userId, WebSocketSession session) { if (!usersSessionsMap.containsKey(userId)) { usersSessionsMap.putIfAbsent(userId, Collections.synchronizedList(new ArrayList<>())); } usersSessionsMap.get(userId).add(session); } @Override public void pushMsgToOne(String userId, String msg) { Optional.ofNullable(usersSessionsMap.get(userId)) .ifPresent(sessions -> sessions.forEach(session -> sendToSession(msg, session))); } @Override public void pushMsgToAll(String msg) { usersSessionsMap.values() .stream() .flatMap(Collection::stream) .forEach(session -> sendToSession(msg, session)); } private void sendToSession(String msg, WebSocketSession session) { try { if (session.isOpen()) { session.sendMessage(new TextMessage(msg)); } } catch (IOException e) { log.error("Failed to send message", e); } } } ``` 这段代码实现了基本的一对一及一对多的消息分发能力,确保任何时刻都能及时有效地传递信息到目标用户那里。 #### 5. 测试与调试 最后,在完成以上步骤之后就可以启动应用并通过工具如Postman或者专门设计用来测试WebSocket的应用来进行验证了。确保一切正常运行后再考虑部署上线等问题[^3]。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

Wyangcsdb

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值