Golang笔记——优秀的并发实现

大家好,这里是Good Note,关注 公主号:Goodnote,专栏文章私信限时Free。本文介绍Go并发,同步,顺序执行,设计的一些常见的场景,顺序执行主要用channel实现。在这种同步信号的使用场景中,使用无缓冲通道,可以选择不关闭通道。

在这里插入图片描述

协程同步背景介绍

无缓冲通道(unbuffered channel)通常用于同步 Goroutine 的执行顺序。它的作用是让 Goroutines 在指定的时刻按顺序执行,而不是用来传递数据。因此,在这种同步信号传递的场景中,不需要关闭通道,程序依然能够正常运行。

无缓冲通道的作用

无缓冲通道的特性是:

  • 发送方必须等待接收方接收数据,才能继续发送下一个数据。
  • 它本质上是一个同步机制,用于协调不同 Goroutine 的执行顺序

这种同步机制不涉及数据传输,只是信号传递。

为什么不需要关闭通道

  • 同步控制:在你的代码中,通道仅仅用于同步 Goroutine 的执行,而不用于传输实际的数据。在同步场景下,通道充当的是“信号”的角色。接收方只关心信号的到来,处理完后就继续执行,通道是否关闭不会影响这一行为。

  • 没有资源泄漏:由于通道没有存储任何数据,而是仅用于发送和接收信号,通道的关闭不会带来资源泄漏。即使不关闭通道,程序也会正确地运行,因为没有对通道进行进一步的发送操作。

  • 避免复杂性:关闭通道通常用于“通知接收方数据已经完成传输”,而在这种情况下,你只是在等待信号,不需要关心通道的关闭状态。因此,不关闭通道反而使得程序逻辑更简洁,避免了复杂的资源管理。

  • 垃圾回收机制close():Go 的垃圾回收机制会自动处理那些不再使用的对象和数据结构,包括通道。所以即使没有显式关闭通道,程序结束时,未关闭的通道也会被垃圾回收。

何时需要关闭通道

在其他情况下,关闭通道是必要的,尤其是在以下几种场景:

  • 多接收方模式:如果你有多个接收方从同一个通道接收数据,关闭通道可以通知所有接收方没有更多的数据可以接收。

  • 通知没有更多数据:如果通道用于传输数据,关闭通道可以标识发送方不再有数据发送,这对于避免接收方阻塞和避免无限等待是非常重要的。

总结

无缓冲通道 的同步信号传递场景下:

  • 不关闭通道是合理的,因为通道的作用仅仅是控制同步,而不是用于传输数据。
  • 通道的关闭通常是在传输数据时,通知接收方“没有更多数据了”,而在同步信号的场景中,这种关闭操作并不必要。

所以,在 顺序执行的同步场景 中,如果你只是通过无缓冲通道传递信号,不关闭通道完全没有问题。

常见的同步场景

顺序执行10个Goroutine

这里选择了关闭channel,其实可以删掉close(),后续的例子不会再关闭channel。

package main

import (
	"fmt"
	"time"
)

func main() {
	// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序
	steps := make([]chan struct{}, 10)
	for i := 0; i < 10; i++ {
		steps[i] = make(chan struct{})
	}

	// 定义 10 个 Goroutine,按照顺序执行
	for i := 0; i < 10; i++ {
		go func(i int) {
			if i == 0 {
				// 第一个 Goroutine 不需要等待
				fmt.Println("Goroutine 1: Start")
				time.Sleep(1 * time.Second) // 模拟工作
				fmt.Println("Goroutine 1: Done")
				// 通知 Goroutine 2 可以开始
				steps[0] <- struct{}{}
				close(steps[0])
			} else {
				// 等待上一个 Goroutine 完成
				<-steps[i-1]
				fmt.Printf("Goroutine %d: Start\n", i+1)
				time.Sleep(1 * time.Second) // 模拟工作
				fmt.Printf("Goroutine %d: Done\n", i+1)
				// 通知下一个 Goroutine 可以开始
				if i < 9 {
					steps[i] <- struct{}{}
					close(steps[i])
				} else {
					// 最后一个 Goroutine,通知主线程完成
					steps[9] <- struct{}{}
					close(steps[9])
				}
			}
		}(i)
	}

	// 等待最后一个 Goroutine 完成
	<-steps[9]
	fmt.Println("All Goroutines Finished!")
}

输出:

Goroutine 1: Start
Goroutine 1: Done
Goroutine 2: Start
Goroutine 2: Done
Goroutine 3: Start
Goroutine 3: Done
Goroutine 4: Start
Goroutine 4: Done
Goroutine 5: Start
Goroutine 5: Done
Goroutine 6: Start
Goroutine 6: Done
Goroutine 7: Start
Goroutine 7: Done
Goroutine 8: Start
Goroutine 8: Done
Goroutine 9: Start
Goroutine 9: Done
Goroutine 10: Start
Goroutine 10: Done
All Goroutines Finished!

两个 Goroutines 的交替执行,交替打印偶数和奇数

package main

import (
	"fmt"
)

func main() {
	// 定义两个channel,分别用于控制打印顺序
	ch1 := make(chan struct{})
	ch2 := make(chan struct{})
	done := make(chan struct{}) // 用于通知主协程完成

	// 启动第一个goroutine,负责打印偶数
	go func() {
		for i := 0; i <= 9; i += 2 {
			<-ch1             // 等待信号
			fmt.Println(i)    // 打印当前偶数
			ch2 <- struct{}{} // 通知另一个goroutine
		}
	}()

	// 启动第二个goroutine,负责打印奇数
	go func() {
		for i := 1; i <= 9; i += 2 {
			<-ch2          // 等待信号
			fmt.Println(i) // 打印当前奇数
			if i == 9 {
				done <- struct{}{} // 打印完成通知主协程
			} else {
				ch1 <- struct{}{} // 通知另一个goroutine
			}
		}
	}()

	// 主协程启动打印过程
	ch1 <- struct{}{} // 先给ch1发送信号,开始打印偶数

	// 主协程等待所有任务完成
	<-done
}

输出:

0
1
2
3
4
5
6
7
8
9

两个 server 的任意一个执行完成,就执行第三个

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package main

import (
	"fmt"
	"time"
)

func server1(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server1"
}
func server2(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server2"
}
func main() {
	for i := 0; i < 5; i++ {
		output1 := make(chan string)
		output2 := make(chan string)
		go server1(output1)
		go server2(output2)
		select {
		case s1 := <-output1:
			fmt.Println(s1, "server3")
		case s2 := <-output2:
			fmt.Println(s2, "server3")
		}
		fmt.Println("--------------")
	}
	fmt.Println("5组执行完成")
}

输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。

server1 server3
--------------
server2 server3
--------------
server1 server3
--------------
server1 server3
--------------
server1 server3
--------------
5组执行完成

说明:
select如果加上default会直接命中default,不会等待两个通道。

两个 server 必须全部执行完成,再执行第三个。

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package main

import (
	"fmt"
	"time"
)

func server1(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server1"
}
func server2(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server2"
}
func main() {
	for i := 0; i < 5; i++ {
		output1 := make(chan string)
		output2 := make(chan string)
		go server1(output1)
		go server2(output2)
		for i := 0; i < 2; i++ {
			select {
			case s1 := <-output1:
				fmt.Println(s1)
			case s2 := <-output2:
				fmt.Println(s2)
			}
		}
		fmt.Println("server3")
		fmt.Println("--------------")
	}
	fmt.Println("5组执行完成")
}

上面使用了for+select控制2个server完成,也可以使用WaitGroup,如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

func server1(wg *sync.WaitGroup) {
	defer wg.Done() // 在函数退出时通知 WaitGroup

	time.Sleep(1 * time.Second)
	fmt.Println("server1")
}
func server2(wg *sync.WaitGroup) {
	defer wg.Done() // 在函数退出时通知 WaitGroup
	time.Sleep(1 * time.Second)
	fmt.Println("server2")
}
func main() {
	for i := 0; i < 5; i++ {
		wg := sync.WaitGroup{}
		wg.Add(2)
		go server1(&wg)
		go server2(&wg)
		wg.Wait()

		fmt.Println("server3")
		fmt.Println("--------------")
	}
	fmt.Println("5组执行完成")
}

输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。

server1
server2
server3
--------------
server1
server2
server3
--------------
server2
server1
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成

三个 server 的必须按 1 2 3 顺序执行

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package main

import (
	"fmt"
	"sync"
	"time"
)

func server1(ch chan string, wg *sync.WaitGroup) {
	defer wg.Done()             // 在函数退出时通知 WaitGroup
	time.Sleep(1 * time.Second) // 模拟工作
	ch <- "server1"
}

func server2(ch chan string, wg *sync.WaitGroup) {
	defer wg.Done()             // 在函数退出时通知 WaitGroup
	time.Sleep(1 * time.Second) // 模拟工作
	ch <- "server2"
}

func main() {
	for i := 0; i < 5; i++ {
		output1 := make(chan string)
		output2 := make(chan string)
		var wg sync.WaitGroup

		// 启动 server1 和 server2
		wg.Add(2) // 等待两个 Goroutine 完成
		go server1(output1, &wg)
		go server2(output2, &wg)

		// 确保按顺序输出 server1, server2, server3
		s1 := <-output1
		s2 := <-output2
		fmt.Println(s1)
		fmt.Println(s2)
		fmt.Println("server3")
		fmt.Println("--------------")
	}

	// 完成5组任务
	fmt.Println("5组执行完成")
}

输出:

server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成

一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)

此处生产者生产的每个消息,只会有一个消费者消费,并发消费。

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 1; i <= 5; i++ {
		fmt.Printf("Produced: %d\n", i)
		ch <- i // 向通道发送数据
		time.Sleep(time.Second)
	}
	close(ch) // 生产者完成后关闭通道
}

func consumer(id int, ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range ch { // 从通道读取任务直到通道关闭
		fmt.Printf("Consumer %d processing task: %d\n", id, task)
		time.Sleep(2 * time.Second) // 模拟消费任务的延时
	}
}

func main() {
	var wg sync.WaitGroup
	tasks := make(chan int, 10) // 定义缓冲区大小为10的任务通道

	// 启动多个消费者
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go consumer(i, tasks, &wg)
	}

	// 启动生产者
	wg.Add(1)
	go producer(tasks, &wg)

	// 等待所有消费者完成工作
	wg.Wait()
	fmt.Println("All tasks completed.")
}

输出:

Produced: 1
Consumer 2 processing task: 1
Produced: 2
Consumer 1 processing task: 2
Produced: 3
Consumer 3 processing task: 3
Produced: 4
Consumer 2 processing task: 4
Produced: 5
Consumer 1 processing task: 5
All tasks completed.

一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)

此处生产者生产的每个消息,每个消费者都会消费。
方案:

  1. 我们应该设计一个 多播机制(广播模式),即每个任务都会被多个消费者消费。最简单的方式是使用 复制通道(通过多个 goroutine 消费同一个任务通道)。

  2. 可以通过 使用多个通道,每个消费者都从这些通道中接收任务,或者使用 sync.WaitGroup 等方式来确保每个消费者都能够完成任务处理。

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(channels []chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 确保 producer 完成时通知 WaitGroup

	// 生产 5 个任务
	for i := 1; i <= 5; i++ {
		fmt.Printf("Produced: %d\n", i)
		// 向每个通道发送任务
		for _, ch := range channels {
			ch <- i
		}
		time.Sleep(time.Second)
	}

	// 所有任务发送完毕后,关闭每个通道
	for _, ch := range channels {
		close(ch)
	}
}

func consumer(id int, ch chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 确保 consumer 完成时通知 WaitGroup

	// 从通道中接收任务并处理
	for task := range ch {
		fmt.Printf("Consumer %d processing task: %d\n", id, task)
		time.Sleep(2 * time.Second) // 模拟任务处理延迟
	}
}

func main() {
	var wg sync.WaitGroup
	numConsumers := 3
	channels := make([]chan int, numConsumers)

	// 创建多个消费者通道
	for i := 0; i < numConsumers; i++ {
		channels[i] = make(chan int, 10)
	}

	// 启动多个消费者
	for i := 1; i <= numConsumers; i++ {
		wg.Add(1)
		go consumer(i, channels[i-1], &wg)
	}

	// 启动生产者
	wg.Add(1)
	go producer(channels, &wg)

	// 等待所有消费者和生产者完成工作
	wg.Wait()
	fmt.Println("All tasks completed.")
}

输出:

Produced: 1
Consumer 1 processing task: 1
Consumer 2 processing task: 1
Consumer 3 processing task: 1
Produced: 2
Produced: 3
Consumer 1 processing task: 2
Consumer 2 processing task: 2
Consumer 3 processing task: 2
Produced: 4
Produced: 5
Consumer 2 processing task: 3
Consumer 1 processing task: 3
Consumer 3 processing task: 3
Consumer 1 processing task: 4
Consumer 2 processing task: 4
Consumer 3 processing task: 4
Consumer 3 processing task: 5
Consumer 2 processing task: 5
Consumer 1 processing task: 5
All tasks completed.

历史文章

MySQL数据库

MySQL数据库

Redis

Redis数据库笔记合集

Golang

  1. Golang笔记——语言基础知识
  2. Golang笔记——切片与数组
  3. Golang笔记——hashmap
  4. Golang笔记——rune和byte
  5. Golang笔记——channel
  6. Golang笔记——Interface类型
  7. Golang笔记——数组、Slice、Map、Channel的并发安全性
  8. Golang笔记——协程同步
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值