场景描述
主要是为了解决企业的大宗车辆位置查询,车辆定位,车辆道路报警,以及车辆的围栏撞击,基于位置派派车等相关的操作。方便企业对车辆的智能化管控,提供可视化的调度和管控。这里我所说的解析服务是对所有上传的车辆位置和设备信息的解析。
车辆设备或【千云司机端 】 -> 企业设备绑定【千云货主端】 -> 设备位置和轨迹查看(承运商查看)(货主查看) -> 设备报警
应用技术
以下是用到的技术,后续会有部分的伪代码。
tcp提供和设备的长连接服务,
netty提供多个设备的解析方案,
springboot+logback日志采集
kafka转发解析消息,
总体的设计方案
项目依赖
以下是对应的伪代码
基础的handler的构建工厂。
public abstract class BaseFactory implements ChannelPipelineFactory {
private final DecodeServer server;
private int timeout;
private static final class OpenChannelHandler extends SimpleChannelHandler {
private final DecodeServer server;
private OpenChannelHandler(DecodeServer server) {
this.server = server;
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
server.channel = e.getChannel();
}
}
public BaseFactory(DecodeServer server) {
this.server = server;
this.timeout = (int) Constant.LINK_TIMEOUT;
}
protected abstract void addHandlers(ChannelPipeline pipeline);
@Override
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = Channels.pipeline();
//添加心跳的handler
if (timeout > 0 && !server.isConnectionless()) {
pipeline.addLast("idleHandler", new IdleStateHandler(GlobalTimer.getTimer(), timeout, 0, 0));
}
//开启hander
pipeline.addLast("openHandler", new OpenChannelHandler(server));
pipeline.addLast("logger", new LoggerHandler());
//添加pipeline
addHandlers(pipeline);
//设备消息的handler
pipeline.addLast("positionHandler", new PositionHandler());
pipeline.addLast("mainHandler", new MainHandler());
return pipeline;
}
}
多个设备解析的自定义的handler,启动对应的的设备指定的端口
public DecodeServer(Bootstrap bootstrap) {
this.bootstrap = bootstrap;
channelFactory = new NioServerSocketChannelFactory();
// 设置通道工厂
bootstrap.setFactory(channelFactory);
//
bootstrap.setPipelineFactory(new BaseFactory(this) {
@Override
protected void addHandlers(ChannelPipeline pipeline) {
DecodeServer.this.addHandlers(pipeline);
}
});
}
//流出对应的扩展handler
protected abstract void addHandlers(ChannelPipeline pipeline);
public void start(int port) {
InetSocketAddress endpoint;
endpoint = new InetSocketAddress(port);
channel = ((ServerBootstrap) bootstrap).bind(endpoint);
}
对应的手机设备的位置解析
private final DecodeServer mobileServer = new DecodeServer(new ServerBootstrap()) {
@Override
protected void addHandlers(ChannelPipeline pipeline) {
pipeline.addLast("mobileframeDecoder", new MobileFrameDecoder());
pipeline.addLast("mobileDecoder", new MobileDecoder());
}
};
消息接收转发
之前想通过日志的方式进行对kafka消息的转发,后来想到在解析转发的过程中,可能需要对消息预先进行一些过滤,还有可能在平台上进行数据的保存存储。所以没有进行日志的收集。大致的思路就是不想在下游的服务不进行过滤的数据。
500设备一年大约是不到500G,在算服务器配置的时候需要进行考虑。我们现在的服务都是部署到线下。通过线上的处理进行线下位置数据保存。这里就相对简单了,将上传的位置数据转发出去。
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
if (e.getMessage() != null && e.getMessage() instanceof Position) {
Position pos = (Position) e.getMessage();
if (pos.getHappenTime() == null) {
pos.setHappenTime(new Date(0));
}
//发送消息
SpringHolder.getPositionProducer().send(SpringHolder.getTopic(), JSON.toJSONString(pos));
Log.logPosition(Log.formatChannel(e.getChannel()), pos.getDeviceId(), pos);
}
}