GO 协程池的浅探

为啥需要协程池

go语言支持多协程并发,具有Goroutine调度的特性。当处理并发问题时 可以开辟多个Goroutine去解决。

虽然go语言在调度Goroutine已经优化的非常完善,开启一个Goroutine的代价非常小。但是,如果无休止的开辟Goroutine会出现高频率的调度Goroutine,那么依然会浪费很多上下文切换的资源。并且每个Goroutine都有一定内存的开销,当量足够大的时候系统内存会承担不住 服务会被自动OOM。

先聊聊并发问题

上半部分是串行,下半部分是并行。明显并发可以

  • - 同一时刻处理多个事务
  • - 节省时间,提高性能效率

并发的条件:

  • - 事务独立
  • - 没有前后结果依赖关系

 go是如何实现并发?

串行


package main

import (
   "fmt"
   "time"
)

func doTask1() {
   time.Sleep(1 * time.Second)
   fmt.Println("test1")
}

func doTask2() {
   time.Sleep(1 * time.Second)
   fmt.Println("test2")
}

func main() {
   beforeTime := time.Now()
   doTask1()
   doTask2()
   runTime := time.Since(beforeTime)
   fmt.Println(runTime)
}

结果

```

test1
test2
2.0007157s


```

并行  task加上go

```go

    ...
    go doTask1()
    doTask2()
    ...


```

结果

```

test1
test2
1.0003288s


```

go协程之间是如何通信的?

使用go语言的另外一个特性channel

```go

package main

import (
    "fmt"
    "time"
)

func doTask1(c chan int) {
    for {
        select {
        case num := <-c:
            fmt.Printf("get %d \n", num)
        }
    }
}

func doTask2(c chan int) {
    for i := 1; i < 10; i++ {
        time.Sleep(1 * time.Second)
        fmt.Printf("put %d \n", i)
        c <- i
    }
}

func main() {
    //创建一个channel
    c := make(chan int, 10)
    go doTask1(c)
    go doTask2(c)
    time.Sleep(12 * time.Second)
}

```

协程池的设计

初步设计上 一个pool 由若干个worker和一个JobsChannel组成。

  • - worker 负责接收来自JobsChannel的task,并且工作执行。
  • - JobsChannel 负责接收进来的每个task然后随机分配给每个worker来处理。
  • -  task 由func执行函数和所需的参数组成。

简单实现

workpool.go```go

import (
    "context"
    "fmt"
    "sync"
)

type TaskFunc func(args ...interface{}) error

type Task struct {
    f    TaskFunc
    args []interface{}
}

func NewTask(f TaskFunc, args ...interface{}) *Task {
    t := Task{
        f:    f,
        args: args,
    }
    return &t
}

func (t *Task) Execute() error {
    err := t.f(t.args...)
    return err
}

type Pool struct {
    //协程池worker数量 (Goroutine个数)
    workerNum int

    //协程池任务队列
    JobsChannel chan *Task

    stopCtx        context.Context
    stopCancelFunc context.CancelFunc
    wg             sync.WaitGroup
}

// 创建一个协程池
func NewPool(cap int, poolLen int) *Pool {
    p := Pool{
        workerNum:   cap,
        JobsChannel: make(chan *Task, poolLen),
    }
    return &p
}


```

workpool_test.go```go

import (
    "fmt"
    "testing"
    "time"
)

func calcAttrTaskFunc() TaskFunc {
    taskFunc := TaskFunc(func(args ...interface{}) error {
        // TODO do someting
        return nil
    })
    return taskFunc
}

func TestCalculator_Workpool(t *testing.T) {

    //创建一个协程池
    p := NewPool(3, 100)

    //启动协程池p
    p.Run()

    //开一个协程 不断的向 Pool 输送task任务
    go func() {
        for i := 1; i < 10; i++ {
            task := NewTask(calcAttrTaskFunc(), i)
            p.JobsChannel <- task
        }
    }()

    time.Sleep(1 * time.Second)
}


```

以上只是简单实现 <u>线程池的并发处理</u> 原理, 那 如果任务执行失败了呢,是不是要重塞重试?再优化下。

workpool.go```go

...
// 加一个任务
func (p *Pool) pushTask(t *Task) {
    p.JobsChannel <- t
}

// 协程池创建一个worker并且开始工作
func (p *Pool) worker(workID int) {
    for {
        select {
        case <-p.stopCtx.Done():
            p.wg.Done()
            fmt.Println("协程池Pool停止")
            return
        case task := <-p.JobsChannel:
            err := task.Execute()
            if err != nil {
                p.pushTask(task)
                continue
            }
            fmt.Println("worker ID ", workID, " 执行 task ID", task.args[0].(int), "任务完毕")
        }
    }
}
...


```

再优化代码更加健壮些,如果worker 15s内没有收到任何任务,则当作timeout回收。

```go

// 协程池创建一个worker并且开始工作
func (p *Pool) worker(workID int) {
    ticker := time.NewTicker(time.Second * 15)
    defer ticker.Stop()
    for {
        select {
        case <-p.stopCtx.Done():
            p.wg.Done()
            fmt.Println("协程池Pool停止")
            return
        case task := <-p.JobsChannel:
            err := task.Execute()
            if err != nil {
                p.pushTask(task)
                continue
            }
            fmt.Println("worker ID ", workID, " 执行 task ID", task.args[0].(int), "任务完毕")
            // 重新设置等待15s
            ticker.Reset(time.Second * 15)
        case <-ticker.C: // 15s 都收不到任何处理就回收
            fmt.Println("worker ID ", workID, " 超时回收")
            return
        }
    }
}


```

但还是会有问题,回收完了 再有任务就处理不了。所以至少保留一个worker,并且当有新任务且达到足够的量时需要重新开启被关闭的worker。```go

type WorkerRecover struct {
    recoverList []int
    lock        sync.RWMutex
}

type Pool struct {
    //协程池worker数量 (Goroutine个数)
    workerNum int

    //协程池任务队列
    JobsChannel chan *Task

    workerRecover *WorkerRecover

    stopCtx        context.Context
    stopCancelFunc context.CancelFunc
    wg             sync.WaitGroup
}

...

// 协程池创建一个worker并且开始工作
func (p *Pool) worker(workID int) {
    ticker := time.NewTicker(time.Second * 15)
    defer ticker.Stop()
    for {
        select {
        case <-p.stopCtx.Done():
            p.wg.Done()
            fmt.Println("协程池Pool停止")
            return
        case task := <-p.JobsChannel:
            err := task.Execute()
            if err != nil {
                p.pushTask(task)
                continue
            }
            fmt.Println("worker ID ", workID, " 执行 task ID", task.args[0].(int), "任务完毕")
            // 重新设置等待15s
            ticker.Reset(time.Second * 15)
        case <-ticker.C: // 15s 都收不到任何处理就回收
            p.workerRecover.lock.Lock()
            if p.workerNum-len(p.workerRecover.recoverList) > 1 {
                fmt.Println("worker ID ", workID, " 超时回收")
                p.workerRecover.recoverList = append(p.workerRecover.recoverList, workID)
            } else {
                ticker.Reset(time.Second * 15)
            }
            p.workerRecover.lock.Unlock()
        }
    }
}

```

不能处理上新加了一个 workerRecover 用于存放回收的worker,至少保留一个worker在工作。

但是呢,如果后面task又越来越多的话,那是不是又会处理不过来了,JobsChannel是不是会阻塞了,所以想着那应该把回收的worker给重新启动。```go

// 加一个任务
func (p *Pool) pushTask(t *Task) {
    select {
    case p.JobsChannel <- t:
    default: // 阻塞了,重新启动worker
        p.Restart()
        time.Sleep(10 * time.Millisecond)
        p.JobsChannel <- t
    }
}

...

// 重启回收的worker
func (p *Pool) Restart() {
    p.workerRecover.lock.Lock()
    defer p.workerRecover.lock.Unlock()
    for _, workID := range p.workerRecover.recoverList {
        fmt.Println("worker ID ", workID, " 重开")
        go p.worker(workID)
    }
}


```

修改下tset,再测试一遍:```go

func TestCalculator_Workpool(t *testing.T) {

    //创建一个协程池
    p := NewPool(3, 10)

    //启动协程池p
    p.Run()

    //var wg sync.WaitGroup
    //wg.Add(10)
    //开一个协程 不断的向 Pool 输送task任务
    go func() {
        for i := 1; i < 10; i++ {
            task := NewTask(calcAttrTaskFunc(), i)
            p.pushTask(task)
            //wg.Done()
        }
    }()
    //wg.Wait()
    time.Sleep(20 * time.Second)
    task := NewTask(calcAttrTaskFunc(), 10)
    p.pushTask(task)
    time.Sleep(1 * time.Second)
    go func() {
        for i := 11; i <= 30; i++ {
            task := NewTask(calcAttrTaskFunc(), i)
            p.pushTask(task)
        }
    }()
    time.Sleep(300 * time.Second)
}


```

测试结果:

```

=== RUN   TestCalculator_Workpool
worker ID  2  执行 task ID 1 任务完毕
worker ID  2  执行 task ID 3 任务完毕
worker ID  2  执行 task ID 4 任务完毕
worker ID  2  执行 task ID 5 任务完毕
worker ID  2  执行 task ID 6 任务完毕
worker ID  2  执行 task ID 7 任务完毕
worker ID  2  执行 task ID 8 任务完毕
worker ID  2  执行 task ID 9 任务完毕
worker ID  1  执行 task ID 2 任务完毕
worker ID  1  超时回收
worker ID  0  超时回收
worker ID  1  执行 task ID 10 任务完毕
worker ID  1  重开
worker ID  0  重开
worker ID  0  执行 task ID 11 任务完毕
worker ID  0  执行 task ID 14 任务完毕
worker ID  0  执行 task ID 15 任务完毕
worker ID  0  执行 task ID 16 任务完毕
worker ID  0  执行 task ID 19 任务完毕
worker ID  0  执行 task ID 20 任务完毕
worker ID  0  执行 task ID 21 任务完毕
worker ID  0  执行 task ID 22 任务完毕
worker ID  0  执行 task ID 23 任务完毕
worker ID  1  执行 task ID 13 任务完毕
worker ID  0  执行 task ID 17 任务完毕
worker ID  1  执行 task ID 18 任务完毕
worker ID  2  执行 task ID 12 任务完毕
worker ID  0  执行 task ID 24 任务完毕
worker ID  0  执行 task ID 29 任务完毕
worker ID  0  执行 task ID 30 任务完毕
worker ID  0  执行 task ID 26 任务完毕
worker ID  1  执行 task ID 27 任务完毕
worker ID  2  执行 task ID 28 任务完毕
worker ID  1  执行 task ID 25 任务完毕


```

后续有空

待尝试 加入管理worker的管理者manager。

以及尝试实践在 游戏中的不同系统属性加成 并发体验

分数阶傅里叶变换(Fractional Fourier Transform, FRFT)是对传统傅里叶变换的拓展,它通过非整数阶的变换方式,能够更有效地处理非线性信号以及涉及时频局部化的问题。在信号处理领域,FRFT尤其适用于分析非平稳信号,例如在雷达、声纳和通信系统中,对线性调频(Linear Frequency Modulation, LFM)信号的分析具有显著优势。LFM信号是一种频率随时间线性变化的信号,因其具有宽频带和良好的时频分辨率,被广泛应用于雷达和通信系统。FRFT能够更精准地捕捉LFM信号的时间和频率信息,相比普通傅里叶变换,其性能更为出色。 MATLAB是一种强大的数值计算和科学计算工具,拥有丰富的函数库和用户友好的界面。在MATLAB中实现FRFT,通常需要编写自定义函数或利用信号处理工具箱中的相关函数。例如,一个名为“frft”的文件可能是用于执行分数阶傅里叶变换的MATLAB脚本或函数,并展示其在信号处理中的应用。FRFT的正确性验证通常通过对比变换前后信号的特性来完成,比如评估信号的重构质量、信噪比等。具体而言,可以通过计算原始信号与经过FRFT处理后的信号之间的相似度,或者对比LFM信号的关键参数(如初始频率、扫频率和持续时间)是否在变换后得到准确恢复。 在MATLAB代码实现中,通常包含以下步骤:首先,生成LFM信号模型,设定其初始频率、扫频率、持续时间和采样率等参数;其次,利用自定义的frft函数对LFM信号进行分数阶傅里叶变换;接着,使用MATLAB的可视化工具(如plot或imagesc)展示原始信号的时域和频域表示,以及FRFT后的结果,以便直观对比;最后,通过计算均方误差、峰值信噪比等指标来评估FRFT的性能。深入理解FRFT的数学原理并结合MATLAB编程技巧,可以实现对LFM信号的有效分析和处理。这个代码示例不仅展示了理论知识在
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值