Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息
零、 目录
- IM系统简介
- Netty 简介
- Netty 环境配置
- 服务端启动流程
- 客户端启动流程
- 实战: 客户端和服务端双向通信
- 数据传输载体ByteBuf介绍
- 客户端与服务端通信协议编解码
- 实现客户端登录
- 实现客户端与服务端收发消息
- pipeline与channelHandler
- 构建客户端与服务端pipeline
- 拆包粘包理论与解决方案
- channelHandler的生命周期
- 使用channelHandler的热插拔实现客户端身份校验
- 客户端互聊原理与实现
- 群聊的发起与通知
- 群聊的成员管理(加入与退出,获取成员列表)
- 群聊消息的收发及Netty性能优化
- 心跳与空闲检测
- 总结
- 扩展
一、 实现需求
- 这一小节 , 我们来实现客户端与服务端收发消息, 我们要实现的具体功能是: 在控制台输入一条消息后按回车 , 交验完客户端登录状态之后 , 把消息发送到服务端 , 服务端收到消息之后打印并向客户端回复一条消息 , 客户端收到消息后打印。
二、 代码框架
-
在代码框架中我们已经实现了 服务端启动 、 客户端启动 、 客户端与服务端双向通信 、 客户端与服务端通信协议编解码 、 客户端登录的逻辑 , 接下来你可以把代码框架粘贴到你的编辑器中跟我来一起实现客户端与服务端收发消息
import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import com.alibaba.fastjson.JSONObject; import com.tj.NIO_test_maven.Test_11_LoginResponsePacket.Code; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.Data; /** * 2019年1月3日 * * @author outman * * 实现客户端和服务端收发消息 * */ public class Test_11_实现客户端与服务端收发消息 { public static void main(String[] args) { // 启动服务端 Test_11_server.start(8000); // 启动客户端 Test_11_client.start("127.0.0.1", 8000, 5); } } /** * 2019年1月3日 * * @author outman * * 服务端 */ class Test_11_server { /** * @desc 服务端启动 * @param port */ public static void start(int port) { NioEventLoopGroup bossgroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossgroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 添加服务端处理逻辑 ch.pipeline().addLast(new Test_11_serverHandler()); } }); bind(serverBootstrap, port); } /** * @desc 自动绑定递增并启动服务端 * @param serverBootstrap * @param port */ private static void bind(ServerBootstrap serverBootstrap, int port) { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】成功"); } else { System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】失败,执行递增绑定"); bind(serverBootstrap, port + 1); } }); } } /** * 2019年1月3日 * * @author outman * * 客户端 */ class Test_11_client { /** * 客户端启动 * * @param ip * 连接ip * @param port * 服务端端口 * @param maxRetry * 最大重试次数 */ public static void start(String ip, int port, int maxRetry) { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 添加 客户端处理逻辑 ch.pipeline().addLast(new Test_11_clientHandler()); } }); // 连接服务端 connect(bootstrap, ip, port, maxRetry); } /** * @desc 连接服务端 * @param bootstrap * @param ip * @param port * @param maxRetry * @param retryIndex * 重试计数 */ private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) { bootstrap.connect(ip, port).addListener(future -> { int[] finalRetryIndex; // 初始化 重连计数 if (retryIndex.length == 0) { finalRetryIndex = new int[] { 0 }; } else { finalRetryIndex = retryIndex; } // 判断连接状态 if (future.isSuccess()) { System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】成功"); } else if (maxRetry <= 0) { System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败,达到重连最大次数放弃重连"); } else { // 重连使用退避算法 int delay = 1 << finalRetryIndex[0]; System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试"); bootstrap.config().group().schedule(() -> { connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1); }, delay, TimeUnit.SECONDS); } }); } } /** * 客户端处理逻辑 * * @author outman */ class Test_11_clientHandler extends ChannelInboundHandlerAdapter { /** * 连接成功时触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端:" + new Date() + "开始登陆"); // 创建登陆对象 Test_11_LoginRequestPacket loginRequestPacket = new Test_11_LoginRequestPacket(); // 随机取ID 1~999 loginRequestPacket.setUserId((int) (Math.random() * 1000) + 1); loginRequestPacket.setUserName("outman"); loginRequestPacket.setPassword("123456"); // 编码 ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket); // 写出数据 ctx.channel().writeAndFlush(byteBuf); } /** * 有数据可读时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; // 数据包解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据不同的指令选择对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_RESPONSE: Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet; System.out.println("客户端:" + new Date() + "收到服务端响应【" + loginResponsePacket.getMsg() + "】"); break; default: break; } } } /** * 服务端处理逻辑 * * @author outman */ class Test_11_serverHandler extends ChannelInboundHandlerAdapter { /** * 连接成功时触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } /** * 有数据可读时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { ByteBuf byteBuf = (ByteBuf) obj; // 解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据指令执行对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_REQUEST: Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet; // 模拟校验成功 System.out.println("服务端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陆成功"); // 给服务端响应 Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket(); loginResponsePacket.setCode(Code.SUCCESS); loginResponsePacket.setMsg("登陆成功!"); // 编码 byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket); // 写出数据 ctx.channel().writeAndFlush(byteBuf); break; default: System.out.println(&#