public class DiagPacketPoller : IDataPoller
{
internal static Threader<Controller> Threader = new Threader<Controller>(delegate (Controller controller, DelegateSpooler spooler)
{
controller.ControlCenter.Diagnostics.Update(spooler);
}, typeof(DiagPacketPoller).Name);
private object subscriptionMutex = new object();
private Controller controller;
private volatile bool controllerResetting;
private bool enabled;
private bool autoStart = true;
private ControllerDiagPacket last;
private EventHandler<ErrorEventArgs> errorOccurred;
private EventHandler<NewDiagPacketArrivedEventArgs> newDiagPacketArrivedEventHandler;
public bool IsSuspended => !enabled;
public bool AutoStart
{
get
{
return autoStart;
}
set
{
autoStart = value;
}
}
public ControllerDiagPacket Latest => last;
public bool IsExecutingEvent => Threader.Spooler.IsPipeExecuting(controller.Number);
public int RefreshInterval
{
get
{
return Threader.Interval;
}
set
{
Threader.Interval = value;
}
}
public event EventHandler<ErrorEventArgs> ErrorOccurred
{
add
{
lock (subscriptionMutex)
{
errorOccurred = (EventHandler<ErrorEventArgs>)Delegate.Combine(errorOccurred, value);
}
}
remove
{
lock (subscriptionMutex)
{
errorOccurred = (EventHandler<ErrorEventArgs>)Delegate.Remove(errorOccurred, value);
}
}
}
public event EventHandler<NewDiagPacketArrivedEventArgs> NewDiagPacketArrived
{
add
{
lock (subscriptionMutex)
{
newDiagPacketArrivedEventHandler = (EventHandler<NewDiagPacketArrivedEventArgs>)Delegate.Combine(newDiagPacketArrivedEventHandler, value);
if (autoStart)
{
Threader.AddSubscriber(controller);
enabled = true;
}
}
}
remove
{
lock (subscriptionMutex)
{
newDiagPacketArrivedEventHandler = (EventHandler<NewDiagPacketArrivedEventArgs>)Delegate.Remove(newDiagPacketArrivedEventHandler, value);
if (newDiagPacketArrivedEventHandler == null || newDiagPacketArrivedEventHandler.GetInvocationList().Length == 0)
{
Threader.RemoveSubscriber(controller, controller.Number);
enabled = false;
}
}
}
}
internal DiagPacketPoller(Controller controller)
{
this.controller = controller;
this.controller.Information.ControllerResettingPre += delegate
{
controllerResetting = true;
};
this.controller.Information.ControllerResetPost += delegate
{
controllerResetting = false;
};
}
public void Suspend()
{
if (enabled)
{
lock (subscriptionMutex)
{
Threader.RemoveSubscriber(controller, controller.Number);
enabled = false;
}
}
}
public void Resume()
{
if (!enabled)
{
lock (subscriptionMutex)
{
Threader.AddSubscriber(controller);
enabled = true;
}
}
}
internal void RaiseNewDiagPacketArrived(ControllerDiagPacket diagPacket)
{
EventHandler<NewDiagPacketArrivedEventArgs> eventHandler;
lock (subscriptionMutex)
{
eventHandler = newDiagPacketArrivedEventHandler;
}
eventHandler?.Invoke(this, new NewDiagPacketArrivedEventArgs(controller, diagPacket));
}
void IDataPoller.UnsubscribeAll()
{
lock (subscriptionMutex)
{
if (newDiagPacketArrivedEventHandler != null)
{
Delegate[] invocationList = newDiagPacketArrivedEventHandler.GetInvocationList();
for (int i = 0; i < invocationList.Length; i++)
{
EventHandler<NewDiagPacketArrivedEventArgs> value = (EventHandler<NewDiagPacketArrivedEventArgs>)invocationList[i];
NewDiagPacketArrived -= value;
}
}
if (errorOccurred != null)
{
Delegate[] invocationList = errorOccurred.GetInvocationList();
for (int i = 0; i < invocationList.Length; i++)
{
EventHandler<ErrorEventArgs> value2 = (EventHandler<ErrorEventArgs>)invocationList[i];
errorOccurred = (EventHandler<ErrorEventArgs>)Delegate.Remove(errorOccurred, value2);
}
}
}
}
internal void RaiseErrorOccurred(Exception e)
{
EventHandler<ErrorEventArgs> eventHandler = null;
lock (subscriptionMutex)
{
eventHandler = errorOccurred;
}
eventHandler?.Invoke(this, new ErrorEventArgs(controller, e));
}
private void Update(DelegateSpooler spooler)
{
int number = controller.Number;
ControllerDiagPacket data = null;
try
{
data = controller.DataCollection.RetrieveDiagnostics();
}
catch (Exception ex)
{
Exception ex2 = ex;
Exception e = ex2;
spooler.Set(number, delegate
{
controller.ControlCenter.Diagnostics.RaiseErrorOccurred(e);
});
if (!(e is CommunicationException))
{
last = null;
}
return;
}
if (controllerResetting || data == null)
{
return;
}
last = data;
spooler.Set(number, delegate
{
if (!controllerResetting)
{
RaiseNewDiagPacketArrived(data);
}
});
}
}
1. 核心功能
这是一个 诊断数据轮询器,实现了 IDataPoller
接口,主要功能是:
-
定期从控制器(
Controller
)获取诊断数据包(ControllerDiagPacket
)。 -
通过事件机制通知订阅者:
-
新数据到达(
NewDiagPacketArrived
) -
发生错误(
ErrorOccurred
)
-
-
支持 启动/暂停轮询、自动订阅管理 和 线程安全控制。
2. 关键成员分析
(1)线程与轮询控制
-
Threader<Controller>
核心线程管理器,内部通过委托调用Update
方法,实现定时轮询。-
RefreshInterval
属性控制轮询间隔。 -
使用
DelegateSpooler
确保事件在正确的线程上下文中执行。
-
-
enabled
与autoStart
-
enabled
:标记当前是否启用轮询。 -
autoStart
:当首个订阅者注册时自动开始轮询(默认开启)。
-
(2)事件管理
-
事件列表
-
NewDiagPacketArrived
:新数据到达事件。 -
ErrorOccurred
:错误事件(如通信异常)。
-
-
线程安全设计
通过subscriptionMutex
锁保护事件订阅/取消订阅操作,避免多线程冲突。
(3)状态跟踪
-
controllerResetting
volatile
标记控制器是否正在重置,防止在重置期间处理无效数据。 -
last
缓存最后一次有效的诊断数据包(Latest
属性对外暴露)。
3. 核心方法解析
(1)轮询逻辑(Update
方法)
private void Update(DelegateSpooler spooler) {
// 1. 从控制器获取诊断数据
var data = controller.DataCollection.RetrieveDiagnostics();
// 2. 处理异常(如通信失败)
catch (Exception ex) {
spooler.Queue(() => RaiseErrorOccurred(ex)); // 异步触发错误事件
last = null; // 清空缓存
}
// 3. 有效数据到达时更新并通知
if (!controllerResetting && data != null) {
last = data;
spooler.Queue(() => RaiseNewDiagPacketArrived(data));
}
}
-
异常处理:捕获通信异常等,通过
ErrorOccurred
事件通知调用方。 -
状态检查:忽略控制器重置期间的数据。
(2)事件触发
-
RaiseNewDiagPacketArrived
安全地触发NewDiagPacketArrived
事件,传递控制器和诊断数据。 -
RaiseErrorOccurred
类似地触发错误事件,包含异常信息。
(3)订阅管理
-
事件订阅/取消
-
当首个订阅者注册时,若
autoStart=true
则自动启动轮询(Threader.AddSubscriber
)。 -
当最后一个订阅者取消时,自动停止轮询(
Threader.RemoveSubscriber
)。
-
-
UnsubscribeAll
强制清除所有事件订阅者(实现IDataPoller
接口要求)。
4. 线程安全设计
-
锁的应用
-
subscriptionMutex
保护事件订阅逻辑(add/remove
访问器)。 -
volatile
确保多线程对controllerResetting
的可见性。
-
-
委托队列(
DelegateSpooler
)
将事件调用封装为委托任务,由Threader
统一调度,避免跨线程问题。
5. 典型使用场景
var poller = new DiagPacketPoller(controller);
poller.NewDiagPacketArrived += (sender, e) => {
Console.WriteLine($"New data: {e.DiagPacket}");
};
poller.ErrorOccurred += (sender, e) => {
Console.WriteLine($"Error: {e.Exception.Message}");
};
6. 改进建议
-
资源释放
可考虑实现IDisposable
接口,在销毁时主动释放Threader
资源。 -
日志记录
增加详细日志(如轮询周期、异常堆栈),便于调试。 -
取消支持
引入CancellationToken
支持优雅停止轮询。
总结
该类是一个典型的 生产者-消费者模型 实现,通过事件机制解耦数据采集与处理逻辑,适合需要后台轮询硬件状态的工业控制场景。