使用node接收warlock的数据
1 目标
本文的目的是使用node接收warlock的数据。
2 代码
2.1 安装包
pip install grpcio grpcio-tools
2.2 使用 protoc 命令生成 Python 代码
运行后会生成2个py文件(af_service_pb2.py和af_service_pb2_grpc.py)
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. af_service.proto
2.3 主程序(需要使用多线程处理多个stream)
import grpc
import time
from af_service_pb2_grpc import AFDataCollectServiceStub
from af_service_pb2 import Request
import threading
# 配置
CONFIG = {
'server_address': '127.0.0.1:5005',
'max_retries': 3,
'retry_delay': 2, # 秒
'connection_timeout': 5, # 秒
}
# 重试状态
state = {
'current_retry': 0,
'is_shutting_down': False
}
# ================= 工具函数 ===================
def safe_number(num, precision=6):
return f"{
num:.{
precision}f}" if isinstance(num, (int, float)) else "无数据"
def format_position(pos):
if not pos:
return ' 位置信息: 无可用数据'
return f""" 位置详情:
坐标: ({
safe_number(pos.lat)}, {
safe_number(pos.lon)}) 高度: {
safe_number(pos.alt, 1)}米
北东天坐标: [X: {
safe_number(pos.neu_x, 2)}, Y: {
safe_number(pos.neu_y, 2)}, Z: {
safe_number(pos.neu_z, 2)}]
姿态: 航向 {
safe_number(pos.heading, 1)}°, 俯仰 {
safe_number(pos.pitch, 1)}°, 横滚 {
safe_number(pos.roll, 1)}°
速度: {
safe_number(pos.speed, 1)} 米/秒 (北东天速度: [{
safe_number(pos.neu_vx, 1)}, {
safe_number(pos.neu_vy, 1)}, {
safe_number(pos.neu_vz, 1)}])
角度: 攻角 {
safe_number(pos.attack_angle, 2)}°, 侧滑角 {
safe_number(pos.sideslip_angle, 2)}°
"""
def format_sensor_info(sensors):