一个泛型线程管理器实现Threader<TObject>

internal class Threader<TObject>
{
    internal delegate void Caller(TObject obj, DelegateSpooler spooler);

    private List<TObject> toPoll = new List<TObject>();

    private Thread worker;

    private ThreadPriority threadPriority = ThreadPriority.Normal;

    private bool keepRunning;

    private object threadMutex = new object();

    private object subscriptionMutex = new object();

    private Caller caller;

    private string name = "";

    private DelegateSpooler spooler;

    private int interval;

    internal DelegateSpooler Spooler => spooler;

    public int Interval
    {
        get
        {
            return interval;
        }
        set
        {
            if (value <= 0)
            {
                throw new ArgumentOutOfRangeException("value", string.Format(Resources.ArgumentMustBePositive, "value"));
            }

            interval = value;
        }
    }

    internal Threader(Caller caller, string name)
    {
        ResetInterval();
        this.caller = caller;
        spooler = new DelegateSpooler(name);
        this.name = name;
    }

    internal void ResetInterval()
    {
        interval = 100;
    }

    public void AddSubscriber(TObject controller)
    {
        lock (subscriptionMutex)
        {
            if (!toPoll.Contains(controller))
            {
                toPoll.Add(controller);
            }

            lock (threadMutex)
            {
                if (worker == null)
                {
                    keepRunning = true;
                    worker = new Thread(InternalUtilities.WrapDelegateForCulture(bgWorker));
                    worker.Name = typeof(Threader<TObject>).Name + "<" + name + ">";
                    worker.Priority = threadPriority;
                    worker.IsBackground = true;
                    worker.Start();
                }
            }
        }
    }

    public void RemoveSubscriber(TObject controller, int number)
    {
        lock (subscriptionMutex)
        {
            toPoll.Remove(controller);
            spooler.Set(number, null);
        }
    }

    public void StopAll()
    {
        spooler.ClearQueue();
        keepRunning = false;
        Thread thread;
        lock (threadMutex)
        {
            thread = worker;
        }

        thread?.Join();
    }

    private void bgWorker()
    {
        try
        {
            while (keepRunning)
            {
                TObject[] array;
                lock (subscriptionMutex)
                {
                    lock (threadMutex)
                    {
                        array = toPoll.ToArray();
                        if (array.Length == 0)
                        {
                            worker = null;
                            break;
                        }
                    }
                }

                TObject[] array2 = array;
                foreach (TObject obj in array2)
                {
                    caller(obj, spooler);
                }

                Thread.Sleep(interval);
            }
        }
        finally
        {
            lock (threadMutex)
            {
                worker = null;
            }
        }
    }
}

以下是针对 Threader<TObject> 类的详细中文分析:


1. 核心功能

这是一个 泛型线程管理器,用于对一组对象(TObject)进行 后台轮询操作,核心功能包括:

  • 定时调用委托方法(通过 Caller 委托)。

  • 线程安全的对象订阅管理AddSubscriber/RemoveSubscriber)。

  • 异步任务队列(通过 DelegateSpooler 处理回调)。


2. 关键成员分析

(1)线程控制相关
  • worker 线程
    后台执行轮询的核心线程,通过 bgWorker 方法实现循环逻辑。

  • keepRunning 标志
    控制线程是否继续运行(true 运行,false 停止)。

  • threadMutex 锁
    保护线程状态(如启动/停止)的线程安全。

(2)轮询配置
  • interval
    轮询间隔时间(毫秒),通过 Interval 属性可修改(值必须为正数)。

  • Caller 委托
    定义轮询时执行的逻辑(由构造函数传入)。

(3)订阅管理
  • toPoll 列表
    存储所有需要轮询的对象(泛型 TObject)。

  • subscriptionMutex 锁
    保护订阅列表的线程安全操作。

(4)辅助工具
  • DelegateSpooler
    委托任务队列(通过 Spooler 属性暴露),用于异步执行回调。


3. 核心方法解析

(1)线程生命周期管理
  • AddSubscriber

public void AddSubscriber(TObject controller) {
    lock (subscriptionMutex) {
        if (!toPoll.Contains(controller)) {
            toPoll.Add(controller);
        }
        // 首次订阅时启动线程
        if (worker == null) {
            keepRunning = true;
            worker = new Thread(bgWorker);
            worker.Start();
        }
    }
}
    • 添加订阅对象,若线程未启动则创建并启动新线程。

    • 线程安全:通过双锁(subscriptionMutex + threadMutex)避免竞争条件。

  • RemoveSubscriber

public void RemoveSubscriber(TObject controller, int number) {
    lock (subscriptionMutex) {
        toPoll.Remove(controller);
        spooler.Set(number, null); // 清理对应任务
    }
}
    • StopAll

    public void StopAll() {
        spooler.ClearQueue(); // 清空任务队列
        keepRunning = false;  // 停止线程循环
        worker?.Join();      // 等待线程结束
    }
      • 强制停止所有轮询,阻塞直到线程退出。

    (2)轮询逻辑(bgWorker
    private void bgWorker() {
        while (keepRunning) {
            TObject[] targets;
            lock (subscriptionMutex) {
                lock (threadMutex) {
                    targets = toPoll.ToArray();
                    if (targets.Length == 0) {
                        worker = null; // 无订阅对象时自动停止
                        break;
                    }
                }
            }
            // 遍历所有对象执行轮询
            foreach (var obj in targets) {
                caller(obj, spooler); // 调用构造函数传入的委托
            }
            Thread.Sleep(interval);    // 等待间隔
        }
    }
    • 双锁检查:确保订阅列表和线程状态同步。

    • 自动停止:当无订阅对象时,线程自动退出。


    4. 线程安全设计

    • 锁的粒度

      • subscriptionMutex:保护 toPoll 列表的修改。

      • threadMutex:保护 worker 线程的状态变更。

    • 无竞态条件
      关键操作(如启动线程、修改列表)均通过锁隔离。


    5. 典型使用场景

    // 1. 定义轮询逻辑
    var threader = new Threader<Controller>((controller, spooler) => {
        var data = controller.GetData();
        spooler.Set(controller.Id, () => Console.WriteLine(data));
    }, "DataPoller");
    
    // 2. 添加订阅对象
    threader.AddSubscriber(controller1);
    
    // 3. 停止所有轮询
    threader.StopAll();

     改进建议

    1. 异常处理
      bgWorker 中的异常未向上传递,可增加全局错误事件通知。

    2. 性能优化

      • 使用 ReaderWriterLockSlim 替代 lock 以提高读并发。

      • 避免在循环中频繁 ToArray(可缓存列表快照)。

    3. 资源释放
      实现 IDisposable 接口确保线程和队列的清理。


    总结

    Threader<TObject> 是一个高效的后台轮询框架,其核心价值在于:

    • 泛型设计:可适配任意类型的对象轮询。

    • 安全的线程管理:通过锁和状态标志避免资源冲突。

    • 灵活的委托机制:允许外部定义轮询行为。

    适用于需要 多对象定时检查 的场景(如设备监控、数据采集等)。

    评论
    添加红包

    请填写红包祝福语或标题

    红包个数最小为10个

    红包金额最低5元

    当前余额3.43前往充值 >
    需支付:10.00
    成就一亿技术人!
    领取后你会自动成为博主和红包主的粉丝 规则
    hope_wisdom
    发出的红包
    实付
    使用余额支付
    点击重新获取
    扫码支付
    钱包余额 0

    抵扣说明:

    1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
    2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

    余额充值