internal class Threader<TObject>
{
internal delegate void Caller(TObject obj, DelegateSpooler spooler);
private List<TObject> toPoll = new List<TObject>();
private Thread worker;
private ThreadPriority threadPriority = ThreadPriority.Normal;
private bool keepRunning;
private object threadMutex = new object();
private object subscriptionMutex = new object();
private Caller caller;
private string name = "";
private DelegateSpooler spooler;
private int interval;
internal DelegateSpooler Spooler => spooler;
public int Interval
{
get
{
return interval;
}
set
{
if (value <= 0)
{
throw new ArgumentOutOfRangeException("value", string.Format(Resources.ArgumentMustBePositive, "value"));
}
interval = value;
}
}
internal Threader(Caller caller, string name)
{
ResetInterval();
this.caller = caller;
spooler = new DelegateSpooler(name);
this.name = name;
}
internal void ResetInterval()
{
interval = 100;
}
public void AddSubscriber(TObject controller)
{
lock (subscriptionMutex)
{
if (!toPoll.Contains(controller))
{
toPoll.Add(controller);
}
lock (threadMutex)
{
if (worker == null)
{
keepRunning = true;
worker = new Thread(InternalUtilities.WrapDelegateForCulture(bgWorker));
worker.Name = typeof(Threader<TObject>).Name + "<" + name + ">";
worker.Priority = threadPriority;
worker.IsBackground = true;
worker.Start();
}
}
}
}
public void RemoveSubscriber(TObject controller, int number)
{
lock (subscriptionMutex)
{
toPoll.Remove(controller);
spooler.Set(number, null);
}
}
public void StopAll()
{
spooler.ClearQueue();
keepRunning = false;
Thread thread;
lock (threadMutex)
{
thread = worker;
}
thread?.Join();
}
private void bgWorker()
{
try
{
while (keepRunning)
{
TObject[] array;
lock (subscriptionMutex)
{
lock (threadMutex)
{
array = toPoll.ToArray();
if (array.Length == 0)
{
worker = null;
break;
}
}
}
TObject[] array2 = array;
foreach (TObject obj in array2)
{
caller(obj, spooler);
}
Thread.Sleep(interval);
}
}
finally
{
lock (threadMutex)
{
worker = null;
}
}
}
}
以下是针对 Threader<TObject>
类的详细中文分析:
1. 核心功能
这是一个 泛型线程管理器,用于对一组对象(TObject
)进行 后台轮询操作,核心功能包括:
-
定时调用委托方法(通过
Caller
委托)。 -
线程安全的对象订阅管理(
AddSubscriber
/RemoveSubscriber
)。 -
异步任务队列(通过
DelegateSpooler
处理回调)。
2. 关键成员分析
(1)线程控制相关
-
worker
线程
后台执行轮询的核心线程,通过bgWorker
方法实现循环逻辑。 -
keepRunning
标志
控制线程是否继续运行(true
运行,false
停止)。 -
threadMutex
锁
保护线程状态(如启动/停止)的线程安全。
(2)轮询配置
-
interval
轮询间隔时间(毫秒),通过Interval
属性可修改(值必须为正数)。 -
Caller
委托
定义轮询时执行的逻辑(由构造函数传入)。
(3)订阅管理
-
toPoll
列表
存储所有需要轮询的对象(泛型TObject
)。 -
subscriptionMutex
锁
保护订阅列表的线程安全操作。
(4)辅助工具
-
DelegateSpooler
委托任务队列(通过Spooler
属性暴露),用于异步执行回调。
3. 核心方法解析
(1)线程生命周期管理
-
AddSubscriber
public void AddSubscriber(TObject controller) {
lock (subscriptionMutex) {
if (!toPoll.Contains(controller)) {
toPoll.Add(controller);
}
// 首次订阅时启动线程
if (worker == null) {
keepRunning = true;
worker = new Thread(bgWorker);
worker.Start();
}
}
}
-
-
添加订阅对象,若线程未启动则创建并启动新线程。
-
线程安全:通过双锁(
subscriptionMutex
+threadMutex
)避免竞争条件。
-
-
RemoveSubscriber
public void RemoveSubscriber(TObject controller, int number) {
lock (subscriptionMutex) {
toPoll.Remove(controller);
spooler.Set(number, null); // 清理对应任务
}
}
-
-
StopAll
public void StopAll() {
spooler.ClearQueue(); // 清空任务队列
keepRunning = false; // 停止线程循环
worker?.Join(); // 等待线程结束
}
-
-
强制停止所有轮询,阻塞直到线程退出。
-
(2)轮询逻辑(bgWorker
)
private void bgWorker() {
while (keepRunning) {
TObject[] targets;
lock (subscriptionMutex) {
lock (threadMutex) {
targets = toPoll.ToArray();
if (targets.Length == 0) {
worker = null; // 无订阅对象时自动停止
break;
}
}
}
// 遍历所有对象执行轮询
foreach (var obj in targets) {
caller(obj, spooler); // 调用构造函数传入的委托
}
Thread.Sleep(interval); // 等待间隔
}
}
-
双锁检查:确保订阅列表和线程状态同步。
-
自动停止:当无订阅对象时,线程自动退出。
4. 线程安全设计
-
锁的粒度
-
subscriptionMutex
:保护toPoll
列表的修改。 -
threadMutex
:保护worker
线程的状态变更。
-
-
无竞态条件
关键操作(如启动线程、修改列表)均通过锁隔离。
5. 典型使用场景
// 1. 定义轮询逻辑
var threader = new Threader<Controller>((controller, spooler) => {
var data = controller.GetData();
spooler.Set(controller.Id, () => Console.WriteLine(data));
}, "DataPoller");
// 2. 添加订阅对象
threader.AddSubscriber(controller1);
// 3. 停止所有轮询
threader.StopAll();
改进建议
-
异常处理
bgWorker
中的异常未向上传递,可增加全局错误事件通知。 -
性能优化
-
使用
ReaderWriterLockSlim
替代lock
以提高读并发。 -
避免在循环中频繁
ToArray
(可缓存列表快照)。
-
-
资源释放
实现IDisposable
接口确保线程和队列的清理。
总结
Threader<TObject>
是一个高效的后台轮询框架,其核心价值在于:
-
泛型设计:可适配任意类型的对象轮询。
-
安全的线程管理:通过锁和状态标志避免资源冲突。
-
灵活的委托机制:允许外部定义轮询行为。
适用于需要 多对象定时检查 的场景(如设备监控、数据采集等)。