@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
//使用4个字节表示流的长度
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
//获取写入下标
int startIdx = out.writerIndex();
//封装输出流
ByteBufOutputStream bout = new ByteBufOutputStream(out);
ObjectOutputStream oout = null;
try {
//写入4个空字节
bout.write(LENGTH_PLACEHOLDER);
//序列化对象到底层输出流
oout = new CompactObjectOutputStream(bout);
oout.writeObject(msg);
oout.flush();
} finally {
if (oout != null) {
oout.close();
} else {
bout.close();
}
}
//获取写入数据后的写入下标
int endIdx = out.writerIndex();
//在流的起始位置,写入流的长度endIdx - startIdx - 4
out.setInt(startIdx, endIdx - startIdx - 4);
}
}
package io.netty.handler.codec.serialization;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
//继承LengthFieldBasedFrameDecoder
//0, 4, 0, 4 长度字段为4个字节 读取消息后跳过4个字节
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
private final ClassResolver classResolver;
public ObjectDecoder(ClassResolver classResolver) {
this(1048576, classResolver);
}
public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
super(maxObjectSize, 0, 4, 0, 4);
this.classResolver = classResolver;
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//调用父类解码器-如果拿到frame则为一个完整对象流
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
//直接返序列化
ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
try {
return ois.readObject();
} finally {
ois.close();
}
}
}