诊断数据轮询器DiagPacketPoller类实现

public class DiagPacketPoller : IDataPoller
{
    internal static Threader<Controller> Threader = new Threader<Controller>(delegate (Controller controller, DelegateSpooler spooler)
    {
        controller.ControlCenter.Diagnostics.Update(spooler);
    }, typeof(DiagPacketPoller).Name);

    private object subscriptionMutex = new object();

    private Controller controller;

    private volatile bool controllerResetting;

    private bool enabled;

    private bool autoStart = true;

    private ControllerDiagPacket last;

    private EventHandler<ErrorEventArgs> errorOccurred;

    private EventHandler<NewDiagPacketArrivedEventArgs> newDiagPacketArrivedEventHandler;

    public bool IsSuspended => !enabled;

    public bool AutoStart
    {
        get
        {
            return autoStart;
        }
        set
        {
            autoStart = value;
        }
    }

    public ControllerDiagPacket Latest => last;

    public bool IsExecutingEvent => Threader.Spooler.IsPipeExecuting(controller.Number);

    public int RefreshInterval
    {
        get
        {
            return Threader.Interval;
        }
        set
        {
            Threader.Interval = value;
        }
    }

    public event EventHandler<ErrorEventArgs> ErrorOccurred
    {
        add
        {
            lock (subscriptionMutex)
            {
                errorOccurred = (EventHandler<ErrorEventArgs>)Delegate.Combine(errorOccurred, value);
            }
        }
        remove
        {
            lock (subscriptionMutex)
            {
                errorOccurred = (EventHandler<ErrorEventArgs>)Delegate.Remove(errorOccurred, value);
            }
        }
    }

    public event EventHandler<NewDiagPacketArrivedEventArgs> NewDiagPacketArrived
    {
        add
        {
            lock (subscriptionMutex)
            {
                newDiagPacketArrivedEventHandler = (EventHandler<NewDiagPacketArrivedEventArgs>)Delegate.Combine(newDiagPacketArrivedEventHandler, value);
                if (autoStart)
                {
                    Threader.AddSubscriber(controller);
                    enabled = true;
                }
            }
        }
        remove
        {
            lock (subscriptionMutex)
            {
                newDiagPacketArrivedEventHandler = (EventHandler<NewDiagPacketArrivedEventArgs>)Delegate.Remove(newDiagPacketArrivedEventHandler, value);
                if (newDiagPacketArrivedEventHandler == null || newDiagPacketArrivedEventHandler.GetInvocationList().Length == 0)
                {
                    Threader.RemoveSubscriber(controller, controller.Number);
                    enabled = false;
                }
            }
        }
    }

    internal DiagPacketPoller(Controller controller)
    {
        this.controller = controller;
        this.controller.Information.ControllerResettingPre += delegate
        {
            controllerResetting = true;
        };
        this.controller.Information.ControllerResetPost += delegate
        {
            controllerResetting = false;
        };
    }

    public void Suspend()
    {
        if (enabled)
        {
            lock (subscriptionMutex)
            {
                Threader.RemoveSubscriber(controller, controller.Number);
                enabled = false;
            }
        }
    }

    public void Resume()
    {
        if (!enabled)
        {
            lock (subscriptionMutex)
            {
                Threader.AddSubscriber(controller);
                enabled = true;
            }
        }
    }

    internal void RaiseNewDiagPacketArrived(ControllerDiagPacket diagPacket)
    {
        EventHandler<NewDiagPacketArrivedEventArgs> eventHandler;
        lock (subscriptionMutex)
        {
            eventHandler = newDiagPacketArrivedEventHandler;
        }

        eventHandler?.Invoke(this, new NewDiagPacketArrivedEventArgs(controller, diagPacket));
    }

    void IDataPoller.UnsubscribeAll()
    {
        lock (subscriptionMutex)
        {
            if (newDiagPacketArrivedEventHandler != null)
            {
                Delegate[] invocationList = newDiagPacketArrivedEventHandler.GetInvocationList();
                for (int i = 0; i < invocationList.Length; i++)
                {
                    EventHandler<NewDiagPacketArrivedEventArgs> value = (EventHandler<NewDiagPacketArrivedEventArgs>)invocationList[i];
                    NewDiagPacketArrived -= value;
                }
            }

            if (errorOccurred != null)
            {
                Delegate[] invocationList = errorOccurred.GetInvocationList();
                for (int i = 0; i < invocationList.Length; i++)
                {
                    EventHandler<ErrorEventArgs> value2 = (EventHandler<ErrorEventArgs>)invocationList[i];
                    errorOccurred = (EventHandler<ErrorEventArgs>)Delegate.Remove(errorOccurred, value2);
                }
            }
        }
    }

    internal void RaiseErrorOccurred(Exception e)
    {
        EventHandler<ErrorEventArgs> eventHandler = null;
        lock (subscriptionMutex)
        {
            eventHandler = errorOccurred;
        }

        eventHandler?.Invoke(this, new ErrorEventArgs(controller, e));
    }

    private void Update(DelegateSpooler spooler)
    {
        int number = controller.Number;
        ControllerDiagPacket data = null;
        try
        {
            data = controller.DataCollection.RetrieveDiagnostics();
        }
        catch (Exception ex)
        {
            Exception ex2 = ex;
            Exception e = ex2;
            spooler.Set(number, delegate
            {
                controller.ControlCenter.Diagnostics.RaiseErrorOccurred(e);
            });
            if (!(e is CommunicationException))
            {
                last = null;
            }

            return;
        }

        if (controllerResetting || data == null)
        {
            return;
        }

        last = data;
        spooler.Set(number, delegate
        {
            if (!controllerResetting)
            {
                RaiseNewDiagPacketArrived(data);
            }
        });
    }
}

 

1. 核心功能

这是一个 诊断数据轮询器,实现了 IDataPoller 接口,主要功能是:

  • 定期从控制器(Controller)获取诊断数据包ControllerDiagPacket)。

  • 通过事件机制通知订阅者:

    • 新数据到达NewDiagPacketArrived

    • 发生错误ErrorOccurred

  • 支持 启动/暂停轮询自动订阅管理 和 线程安全控制


2. 关键成员分析

(1)线程与轮询控制
  • Threader<Controller>
    核心线程管理器,内部通过委托调用 Update 方法,实现定时轮询。

    • RefreshInterval 属性控制轮询间隔。

    • 使用 DelegateSpooler 确保事件在正确的线程上下文中执行。

  • enabled 与 autoStart

    • enabled:标记当前是否启用轮询。

    • autoStart:当首个订阅者注册时自动开始轮询(默认开启)。

(2)事件管理
  • 事件列表

    • NewDiagPacketArrived:新数据到达事件。

    • ErrorOccurred:错误事件(如通信异常)。

  • 线程安全设计
    通过 subscriptionMutex 锁保护事件订阅/取消订阅操作,避免多线程冲突。

(3)状态跟踪
  • controllerResetting
    volatile 标记控制器是否正在重置,防止在重置期间处理无效数据。

  • last
    缓存最后一次有效的诊断数据包(Latest 属性对外暴露)。


3. 核心方法解析

(1)轮询逻辑(Update 方法)
private void Update(DelegateSpooler spooler) {
    // 1. 从控制器获取诊断数据
    var data = controller.DataCollection.RetrieveDiagnostics();
    
    // 2. 处理异常(如通信失败)
    catch (Exception ex) {
        spooler.Queue(() => RaiseErrorOccurred(ex)); // 异步触发错误事件
        last = null; // 清空缓存
    }
    
    // 3. 有效数据到达时更新并通知
    if (!controllerResetting && data != null) {
        last = data;
        spooler.Queue(() => RaiseNewDiagPacketArrived(data));
    }
}
  • 异常处理:捕获通信异常等,通过 ErrorOccurred 事件通知调用方。

  • 状态检查:忽略控制器重置期间的数据。

(2)事件触发
  • RaiseNewDiagPacketArrived
    安全地触发 NewDiagPacketArrived 事件,传递控制器和诊断数据。

  • RaiseErrorOccurred
    类似地触发错误事件,包含异常信息。

(3)订阅管理
  • 事件订阅/取消

    • 当首个订阅者注册时,若 autoStart=true 则自动启动轮询(Threader.AddSubscriber)。

    • 当最后一个订阅者取消时,自动停止轮询(Threader.RemoveSubscriber)。

  • UnsubscribeAll
    强制清除所有事件订阅者(实现 IDataPoller 接口要求)。


4. 线程安全设计

  • 锁的应用

    • subscriptionMutex 保护事件订阅逻辑(add/remove 访问器)。

    • volatile 确保多线程对 controllerResetting 的可见性。

  • 委托队列(DelegateSpooler
    将事件调用封装为委托任务,由 Threader 统一调度,避免跨线程问题。


5. 典型使用场景

var poller = new DiagPacketPoller(controller);
poller.NewDiagPacketArrived += (sender, e) => {
    Console.WriteLine($"New data: {e.DiagPacket}");
};
poller.ErrorOccurred += (sender, e) => {
    Console.WriteLine($"Error: {e.Exception.Message}");
};

6. 改进建议

  1. 资源释放
    可考虑实现 IDisposable 接口,在销毁时主动释放 Threader 资源。

  2. 日志记录
    增加详细日志(如轮询周期、异常堆栈),便于调试。

  3. 取消支持
    引入 CancellationToken 支持优雅停止轮询。


总结

该类是一个典型的 生产者-消费者模型 实现,通过事件机制解耦数据采集与处理逻辑,适合需要后台轮询硬件状态的工业控制场景。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值