为啥需要协程池
go语言支持多协程并发,具有Goroutine调度的特性。当处理并发问题时 可以开辟多个Goroutine去解决。
虽然go语言在调度Goroutine已经优化的非常完善,开启一个Goroutine的代价非常小。但是,如果无休止的开辟Goroutine会出现高频率的调度Goroutine,那么依然会浪费很多上下文切换的资源。并且每个Goroutine都有一定内存的开销,当量足够大的时候系统内存会承担不住 服务会被自动OOM。
先聊聊并发问题
上半部分是串行,下半部分是并行。明显并发可以
- - 同一时刻处理多个事务
- - 节省时间,提高性能效率
并发的条件:
- - 事务独立
- - 没有前后结果依赖关系
go是如何实现并发?
串行
package main
import (
"fmt"
"time"
)
func doTask1() {
time.Sleep(1 * time.Second)
fmt.Println("test1")
}
func doTask2() {
time.Sleep(1 * time.Second)
fmt.Println("test2")
}
func main() {
beforeTime := time.Now()
doTask1()
doTask2()
runTime := time.Since(beforeTime)
fmt.Println(runTime)
}
结果
```
test1
test2
2.0007157s
```
并行 task加上go
```go
...
go doTask1()
doTask2()
...
```
结果
```
test1
test2
1.0003288s
```
go协程之间是如何通信的?
使用go语言的另外一个特性channel
```go
package main
import (
"fmt"
"time"
)
func doTask1(c chan int) {
for {
select {
case num := <-c:
fmt.Printf("get %d \n", num)
}
}
}
func doTask2(c chan int) {
for i := 1; i < 10; i++ {
time.Sleep(1 * time.Second)
fmt.Printf("put %d \n", i)
c <- i
}
}
func main() {
//创建一个channel
c := make(chan int, 10)
go doTask1(c)
go doTask2(c)
time.Sleep(12 * time.Second)
}
```
协程池的设计
初步设计上 一个pool 由若干个worker和一个JobsChannel组成。
- - worker 负责接收来自JobsChannel的task,并且工作执行。
- - JobsChannel 负责接收进来的每个task然后随机分配给每个worker来处理。
- - task 由func执行函数和所需的参数组成。
简单实现
workpool.go```go
import (
"context"
"fmt"
"sync"
)
type TaskFunc func(args ...interface{}) error
type Task struct {
f TaskFunc
args []interface{}
}
func NewTask(f TaskFunc, args ...interface{}) *Task {
t := Task{
f: f,
args: args,
}
return &t
}
func (t *Task) Execute() error {
err := t.f(t.args...)
return err
}
type Pool struct {
//协程池worker数量 (Goroutine个数)
workerNum int
//协程池任务队列
JobsChannel chan *Task
stopCtx context.Context
stopCancelFunc context.CancelFunc
wg sync.WaitGroup
}
// 创建一个协程池
func NewPool(cap int, poolLen int) *Pool {
p := Pool{
workerNum: cap,
JobsChannel: make(chan *Task, poolLen),
}
return &p
}
```
workpool_test.go```go
import ( "fmt" "testing" "time" ) func calcAttrTaskFunc() TaskFunc { taskFunc := TaskFunc(func(args ...interface{}) error { // TODO do someting return nil }) return taskFunc } func TestCalculator_Workpool(t *testing.T) { //创建一个协程池 p := NewPool(3, 100) //启动协程池p p.Run() //开一个协程 不断的向 Pool 输送task任务 go func() { for i := 1; i < 10; i++ { task := NewTask(calcAttrTaskFunc(), i) p.JobsChannel <- task } }() time.Sleep(1 * time.Second) }
```
以上只是简单实现 <u>线程池的并发处理</u> 原理, 那 如果任务执行失败了呢,是不是要重塞重试?再优化下。
workpool.go```go
...
// 加一个任务
func (p *Pool) pushTask(t *Task) {
p.JobsChannel <- t
}
// 协程池创建一个worker并且开始工作
func (p *Pool) worker(workID int) {
for {
select {
case <-p.stopCtx.Done():
p.wg.Done()
fmt.Println("协程池Pool停止")
return
case task := <-p.JobsChannel:
err := task.Execute()
if err != nil {
p.pushTask(task)
continue
}
fmt.Println("worker ID ", workID, " 执行 task ID", task.args[0].(int), "任务完毕")
}
}
}
...
```
再优化代码更加健壮些,如果worker 15s内没有收到任何任务,则当作timeout回收。
```go
// 协程池创建一个worker并且开始工作
func (p *Pool) worker(workID int) {
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()
for {
select {
case <-p.stopCtx.Done():
p.wg.Done()
fmt.Println("协程池Pool停止")
return
case task := <-p.JobsChannel:
err := task.Execute()
if err != nil {
p.pushTask(task)
continue
}
fmt.Println("worker ID ", workID, " 执行 task ID", task.args[0].(int), "任务完毕")
// 重新设置等待15s
ticker.Reset(time.Second * 15)
case <-ticker.C: // 15s 都收不到任何处理就回收
fmt.Println("worker ID ", workID, " 超时回收")
return
}
}
}
```
但还是会有问题,回收完了 再有任务就处理不了。所以至少保留一个worker,并且当有新任务且达到足够的量时需要重新开启被关闭的worker。```go
type WorkerRecover struct {
recoverList []int
lock sync.RWMutex
}
type Pool struct {
//协程池worker数量 (Goroutine个数)
workerNum int
//协程池任务队列
JobsChannel chan *Task
workerRecover *WorkerRecover
stopCtx context.Context
stopCancelFunc context.CancelFunc
wg sync.WaitGroup
}
...
// 协程池创建一个worker并且开始工作
func (p *Pool) worker(workID int) {
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()
for {
select {
case <-p.stopCtx.Done():
p.wg.Done()
fmt.Println("协程池Pool停止")
return
case task := <-p.JobsChannel:
err := task.Execute()
if err != nil {
p.pushTask(task)
continue
}
fmt.Println("worker ID ", workID, " 执行 task ID", task.args[0].(int), "任务完毕")
// 重新设置等待15s
ticker.Reset(time.Second * 15)
case <-ticker.C: // 15s 都收不到任何处理就回收
p.workerRecover.lock.Lock()
if p.workerNum-len(p.workerRecover.recoverList) > 1 {
fmt.Println("worker ID ", workID, " 超时回收")
p.workerRecover.recoverList = append(p.workerRecover.recoverList, workID)
} else {
ticker.Reset(time.Second * 15)
}
p.workerRecover.lock.Unlock()
}
}
}
```
不能处理上新加了一个 workerRecover 用于存放回收的worker,至少保留一个worker在工作。
但是呢,如果后面task又越来越多的话,那是不是又会处理不过来了,JobsChannel是不是会阻塞了,所以想着那应该把回收的worker给重新启动。```go
// 加一个任务
func (p *Pool) pushTask(t *Task) {
select {
case p.JobsChannel <- t:
default: // 阻塞了,重新启动worker
p.Restart()
time.Sleep(10 * time.Millisecond)
p.JobsChannel <- t
}
}
...
// 重启回收的worker
func (p *Pool) Restart() {
p.workerRecover.lock.Lock()
defer p.workerRecover.lock.Unlock()
for _, workID := range p.workerRecover.recoverList {
fmt.Println("worker ID ", workID, " 重开")
go p.worker(workID)
}
}
```
修改下tset,再测试一遍:```go
func TestCalculator_Workpool(t *testing.T) {
//创建一个协程池
p := NewPool(3, 10)
//启动协程池p
p.Run()
//var wg sync.WaitGroup
//wg.Add(10)
//开一个协程 不断的向 Pool 输送task任务
go func() {
for i := 1; i < 10; i++ {
task := NewTask(calcAttrTaskFunc(), i)
p.pushTask(task)
//wg.Done()
}
}()
//wg.Wait()
time.Sleep(20 * time.Second)
task := NewTask(calcAttrTaskFunc(), 10)
p.pushTask(task)
time.Sleep(1 * time.Second)
go func() {
for i := 11; i <= 30; i++ {
task := NewTask(calcAttrTaskFunc(), i)
p.pushTask(task)
}
}()
time.Sleep(300 * time.Second)
}
```
测试结果:
```
=== RUN TestCalculator_Workpool
worker ID 2 执行 task ID 1 任务完毕
worker ID 2 执行 task ID 3 任务完毕
worker ID 2 执行 task ID 4 任务完毕
worker ID 2 执行 task ID 5 任务完毕
worker ID 2 执行 task ID 6 任务完毕
worker ID 2 执行 task ID 7 任务完毕
worker ID 2 执行 task ID 8 任务完毕
worker ID 2 执行 task ID 9 任务完毕
worker ID 1 执行 task ID 2 任务完毕
worker ID 1 超时回收
worker ID 0 超时回收
worker ID 1 执行 task ID 10 任务完毕
worker ID 1 重开
worker ID 0 重开
worker ID 0 执行 task ID 11 任务完毕
worker ID 0 执行 task ID 14 任务完毕
worker ID 0 执行 task ID 15 任务完毕
worker ID 0 执行 task ID 16 任务完毕
worker ID 0 执行 task ID 19 任务完毕
worker ID 0 执行 task ID 20 任务完毕
worker ID 0 执行 task ID 21 任务完毕
worker ID 0 执行 task ID 22 任务完毕
worker ID 0 执行 task ID 23 任务完毕
worker ID 1 执行 task ID 13 任务完毕
worker ID 0 执行 task ID 17 任务完毕
worker ID 1 执行 task ID 18 任务完毕
worker ID 2 执行 task ID 12 任务完毕
worker ID 0 执行 task ID 24 任务完毕
worker ID 0 执行 task ID 29 任务完毕
worker ID 0 执行 task ID 30 任务完毕
worker ID 0 执行 task ID 26 任务完毕
worker ID 1 执行 task ID 27 任务完毕
worker ID 2 执行 task ID 28 任务完毕
worker ID 1 执行 task ID 25 任务完毕
```
后续有空
待尝试 加入管理worker的管理者manager。
以及尝试实践在 游戏中的不同系统属性加成 并发体验