『不要通过共享内存来通信,我们应该通过通信来共享内存』
Do not communicate by sharing memory; instead, share memory by communicating.
Go 语言推崇的并发解决方案(CSP)
在很多环境中,并发编程带来的很多问题都是因为没有正确实现访问共享内存的逻辑,虽然 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。
Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。这种方式可以保证在同一时间只有一个 Goroutine 能够访问对应的数据,所以数据冲突和线程竞争的问题在设计上就不可能出现。
通道的数据结构
Go 语言中 Channel 是 Goroutine 间重要通信的方式,是并发安全的,通道内的数据遵循 FIFO 的队列特性。
底层的数据结构定义如下,完整代码见 Github(go1.23):
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
- qcount 记录着通道内数据个数。
- dataqsiz 是缓存型通道的大小。
- buf 指向一个数组,用来实现循环队列。对于有缓冲通道,长度为 dataqsiz,对于无缓冲通道,buf 为 nil。
- sendx 是循环队列的队尾指针。
- recvx 是循环队列的队头指针。
- lock 是互斥锁,用来保护 hchan 的数据结构,保证操作的原子性。
hchan 的结构图如下:
「图片来自 深入 Go 语言之旅」
通道的使用
使用规则
Go 语言中通道分为有缓冲通道和无缓冲通道,两种通道进行对应的操作,有不同的结果和表现。
无缓冲通道(Unbuffered Channel)
| 操作 | 无等待者 | 有等待者 | 已关闭的通道 |
|---|---|---|---|
| 发送操作 | 阻塞 | 成功 | panic |
| 接收操作 | 阻塞 | 成功 | 零值 |
| 关闭操作 | 成功 | 成功 | panic |
有缓冲通道(Buffered Channel)
| 操作 | 空通道 | 非空通道 | 已关闭的通道 |
|---|---|---|---|
| 发送操作 | 成功 | 缓冲区满:阻塞;缓冲区未满:成功 | panic |
| 接收操作 | 阻塞 | 成功 | 缓冲区不为空:成功;缓冲区为空:零值 |
| 关闭操作 | 成功 | 成功 | panic |
对已关闭的通道进行发送操作或者再次关闭通道都会导致 panic,使用时需要注意。
常见用法
Channel 类型和基本并发原语是有竞争关系的,它应用于并发场景,涉及到 Goroutine 之间的通讯,可以提供并发的保护等。综合起来,可以把 Channel 的应用场景分为五种类型。
- 数据交流:当作并发的 Buffer 或者 Queue,解决生产者 - 消费者问题。
- 数据传递:一个 Goroutine 将数据交给另一个 Goroutine,作为数据传递的媒介。
- 信号通知:一个 Goroutine 可以将信号传递给另一个或者另一组 Goroutine。
- 任务编排:可以让一组 Goroutine 按照一定的顺序并发或者串行的执行。
- 并发控制:利用 Channel 也可以实现互斥锁的机制,对竞态资源进行保护。
超时控制
代码示例
package main
import (
"fmt"
"testing"
"time"
)
func CallFunc(fn func()) <-chan struct{} {
ch := make(chan struct{}, 1)
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
fn()
ch <- struct{}{}
}()
return ch
}
func testCallFunc(_ *testing.T, cost, timeout time.Duration) {
fn := func() {
fmt.Println("doing something")
time.Sleep(cost)
}
select {
case <-CallFunc(fn):
fmt.Println("done")
case <-time.After(timeout):
fmt.Println("timeout")
}
}
func TestCallFuncDone(t *testing.T) { testCallFunc(t, time.Second, time.Second*2) }
func TestCallFuncTimeout(t *testing.T) { testCallFunc(t, time.Second*2, time.Second) }
执行结果:
=== RUN TestCallFuncDone
doing something
done
--- PASS: TestCallFuncDone (1.00s)
=== RUN TestCallFuncTimeout
doing something
timeout
--- PASS: TestCallFuncTimeout (1.00s)
PASS
ok golang-channel-example 2.007s
协程间通信
代码示例
package main
import (
"fmt"
"testing"
"time"
)
// 向 channel 里投递数据
func Provider(ch chan<- int, nums ...int) {
for _, num := range nums {
ch <- num
}
close(ch)
}
// 从 channel 里消费数据
func Consumer(ch <-chan int, done chan<- struct{}, fn func(int)) {
for num := range ch {
fn(num)
}
done <- struct{}{}
close(done)
}
func testProviderAndConsumer(t *testing.T, consumeFn func(int), nums ...int) {
// 数据交换 channel
ch := make(chan int, 10)
// 控制 channel 退出
done := make(chan struct{})
go Provider(ch, nums...)
go Consumer(ch, done, consumeFn)
// 接收退出并进行超时处理
select {
case <-done:
fmt.Println("done")
case <-time.After(time.Second * 5):
fmt.Println("timeout")
}
}
func ConsumeFunc(num int) {
fmt.Println(num)
time.Sleep(time.Millisecond * 100)
}
func TestProviderAndConsumer_1(t *testing.T) { testProviderAndConsumer(t, ConsumeFunc, 1, 2, 3, 4, 5) }
func TestProviderAndConsumer_2(t *testing.T) { testProviderAndConsumer(t, ConsumeFunc, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15) }
执行结果:
=== RUN TestProviderAndConsumer_1
1
2
3
4
5
done
--- PASS: TestProviderAndConsumer_1 (0.50s)
=== RUN TestProviderAndConsumer_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
done
--- PASS: TestProviderAndConsumer_2 (1.52s)
PASS
ok golang-channel-example 4.028s
模拟互斥锁
代码示例
package main
import (
"fmt"
"sync"
"testing"
)
var _ sync.Locker = (*XMutex)(nil)
type XMutex struct {
mu chan struct{}
}
func NewXMutex() *XMutex {
return &XMutex{mu: make(chan struct{}, 1)}
}
func (x *XMutex) Lock() {
x.mu <- struct{}{}
}
func (x *XMutex) Unlock() {
<-x.mu
}
// NOTE: 模拟互斥锁实现
func testChannelMutexAdd(_ *testing.T, nums uint) {
var count = 0
var wg sync.WaitGroup
mutex := NewXMutex()
for range nums {
wg.Add(1)
go func() {
defer wg.Done()
for range 10000 {
mutex.Lock()
count++
mutex.Unlock()
}
}()
}
wg.Wait()
fmt.Printf("Want %d, Got %d\n", 10000*nums, count)
}
func TestChannelMutexAdd_10(t *testing.T) { testChannelMutexAdd(t, 10) }
func TestChannelMutexAdd_100(t *testing.T) { testChannelMutexAdd(t, 100) }
func TestChannelMutexAdd_1000(t *testing.T) { testChannelMutexAdd(t, 1000) }
执行结果:
=== RUN TestChannelMutexAdd_10
Want 100000, Got 100000
--- PASS: TestChannelMutexAdd_10 (0.05s)
=== RUN TestChannelMutexAdd_100
Want 1000000, Got 1000000
--- PASS: TestChannelMutexAdd_100 (0.49s)
=== RUN TestChannelMutexAdd_1000
Want 10000000, Got 10000000
--- PASS: TestChannelMutexAdd_1000 (5.56s)
PASS
ok command-line-arguments 7.138s
模拟定时器
代码示例
package main
import (
"fmt"
"testing"
"time"
)
func Tick(duration time.Duration) <-chan struct{} {
ch := make(chan struct{}, 1)
go func() {
for {
time.Sleep(duration)
ch <- struct{}{}
}
}()
return ch
}
func testChannelTimeTick(_ *testing.T, delay, timeout time.Duration) {
after := time.After(timeout)
ticker := Tick(delay)
for {
select {
case <-ticker:
fmt.Println("tick")
case <-after:
return
}
}
}
func TestChannelTimeTick_1(t *testing.T) { testChannelTimeTick(t, time.Millisecond*100, time.Second) }
func TestChannelTimeTick_2(t *testing.T) { testChannelTimeTick(t, time.Millisecond*500, time.Second) }
运行结果:
=== RUN TestChannelTimeTick_1
tick
tick
tick
tick
tick
tick
tick
tick
tick
--- PASS: TestChannelTimeTick_1 (1.00s)
=== RUN TestChannelTimeTick_2
tick
--- PASS: TestChannelTimeTick_2 (1.00s)
PASS
ok command-line-arguments 3.017s