Go 语言通道的常见用法

2025/06/17

Categories: 技术 Tags: Golang

『不要通过共享内存来通信,我们应该通过通信来共享内存』

Do not communicate by sharing memory; instead, share memory by communicating.

Go 语言推崇的并发解决方案(CSP)

在很多环境中,并发编程带来的很多问题都是因为没有正确实现访问共享内存的逻辑,虽然 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。

Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。这种方式可以保证在同一时间只有一个 Goroutine 能够访问对应的数据,所以数据冲突和线程竞争的问题在设计上就不可能出现。

通道的数据结构

Go 语言中 Channel 是 Goroutine 间重要通信的方式,是并发安全的,通道内的数据遵循 FIFO 的队列特性。

底层的数据结构定义如下,完整代码见 Github(go1.23):

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    timer    *timer // timer feeding this chan
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

hchan 的结构图如下: 「图片来自 深入 Go 语言之旅

通道的使用

使用规则

Go 语言中通道分为有缓冲通道和无缓冲通道,两种通道进行对应的操作,有不同的结果和表现。

无缓冲通道(Unbuffered Channel)

操作 无等待者 有等待者 已关闭的通道
发送操作 阻塞 成功 panic
接收操作 阻塞 成功 零值
关闭操作 成功 成功 panic

有缓冲通道(Buffered Channel)

操作 空通道 非空通道 已关闭的通道
发送操作 成功 缓冲区满:阻塞;缓冲区未满:成功 panic
接收操作 阻塞 成功 缓冲区不为空:成功;缓冲区为空:零值
关闭操作 成功 成功 panic

对已关闭的通道进行发送操作或者再次关闭通道都会导致 panic,使用时需要注意。

常见用法

Channel 类型和基本并发原语是有竞争关系的,它应用于并发场景,涉及到 Goroutine 之间的通讯,可以提供并发的保护等。综合起来,可以把 Channel 的应用场景分为五种类型。

完整示例代码

超时控制

代码示例
package main

import (
    "fmt"
    "testing"
    "time"
)

func CallFunc(fn func()) <-chan struct{} {
    ch := make(chan struct{}, 1)

    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println(err)
            }
        }()

        fn()
        ch <- struct{}{}
    }()

    return ch
}

func testCallFunc(_ *testing.T, cost, timeout time.Duration) {
    fn := func() {
        fmt.Println("doing something")
        time.Sleep(cost)
    }

    select {
    case <-CallFunc(fn):
        fmt.Println("done")
    case <-time.After(timeout):
        fmt.Println("timeout")
    }
}

func TestCallFuncDone(t *testing.T)    { testCallFunc(t, time.Second, time.Second*2) }
func TestCallFuncTimeout(t *testing.T) { testCallFunc(t, time.Second*2, time.Second) }

执行结果:

=== RUN   TestCallFuncDone
doing something
done
--- PASS: TestCallFuncDone (1.00s)
=== RUN   TestCallFuncTimeout
doing something
timeout
--- PASS: TestCallFuncTimeout (1.00s)
PASS
ok      golang-channel-example  2.007s

协程间通信

代码示例
package main

import (
    "fmt"
    "testing"
    "time"
)

// 向 channel 里投递数据
func Provider(ch chan<- int, nums ...int) {
    for _, num := range nums {
        ch <- num
    }
    close(ch)
}

// 从 channel 里消费数据
func Consumer(ch <-chan int, done chan<- struct{}, fn func(int)) {
    for num := range ch {
        fn(num)
    }
    done <- struct{}{}
    close(done)
}

func testProviderAndConsumer(t *testing.T, consumeFn func(int), nums ...int) {
    // 数据交换 channel
    ch := make(chan int, 10)
    // 控制 channel 退出
    done := make(chan struct{})

    go Provider(ch, nums...)
    go Consumer(ch, done, consumeFn)

    // 接收退出并进行超时处理
    select {
    case <-done:
        fmt.Println("done")
    case <-time.After(time.Second * 5):
        fmt.Println("timeout")
    }
}

func ConsumeFunc(num int) {
    fmt.Println(num)
    time.Sleep(time.Millisecond * 100)
}

func TestProviderAndConsumer_1(t *testing.T) { testProviderAndConsumer(t, ConsumeFunc, 1, 2, 3, 4, 5) }
func TestProviderAndConsumer_2(t *testing.T) { testProviderAndConsumer(t, ConsumeFunc, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15) }

执行结果:

=== RUN   TestProviderAndConsumer_1
1
2
3
4
5
done
--- PASS: TestProviderAndConsumer_1 (0.50s)
=== RUN   TestProviderAndConsumer_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
done
--- PASS: TestProviderAndConsumer_2 (1.52s)
PASS
ok      golang-channel-example  4.028s

模拟互斥锁

代码示例
package main

import (
    "fmt"
    "sync"
    "testing"
)

var _ sync.Locker = (*XMutex)(nil)

type XMutex struct {
    mu chan struct{}
}

func NewXMutex() *XMutex {
    return &XMutex{mu: make(chan struct{}, 1)}
}

func (x *XMutex) Lock() {
    x.mu <- struct{}{}
}

func (x *XMutex) Unlock() {
    <-x.mu
}

// NOTE: 模拟互斥锁实现
func testChannelMutexAdd(_ *testing.T, nums uint) {
    var count = 0
    var wg sync.WaitGroup
    mutex := NewXMutex()

    for range nums {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for range 10000 {
                mutex.Lock()
                count++
                mutex.Unlock()
            }
        }()
    }

    wg.Wait()
    fmt.Printf("Want %d, Got %d\n", 10000*nums, count)
}

func TestChannelMutexAdd_10(t *testing.T)   { testChannelMutexAdd(t, 10) }
func TestChannelMutexAdd_100(t *testing.T)  { testChannelMutexAdd(t, 100) }
func TestChannelMutexAdd_1000(t *testing.T) { testChannelMutexAdd(t, 1000) }

执行结果:

=== RUN   TestChannelMutexAdd_10
Want 100000, Got 100000
--- PASS: TestChannelMutexAdd_10 (0.05s)
=== RUN   TestChannelMutexAdd_100
Want 1000000, Got 1000000
--- PASS: TestChannelMutexAdd_100 (0.49s)
=== RUN   TestChannelMutexAdd_1000
Want 10000000, Got 10000000
--- PASS: TestChannelMutexAdd_1000 (5.56s)
PASS
ok      command-line-arguments  7.138s

模拟定时器

代码示例
package main

import (
    "fmt"
    "testing"
    "time"
)

func Tick(duration time.Duration) <-chan struct{} {
    ch := make(chan struct{}, 1)

    go func() {
        for {
            time.Sleep(duration)
            ch <- struct{}{}
        }
    }()

    return ch
}

func testChannelTimeTick(_ *testing.T, delay, timeout time.Duration) {
    after := time.After(timeout)
    ticker := Tick(delay)

    for {
        select {
        case <-ticker:
            fmt.Println("tick")
        case <-after:
            return
        }
    }
}

func TestChannelTimeTick_1(t *testing.T) { testChannelTimeTick(t, time.Millisecond*100, time.Second) }
func TestChannelTimeTick_2(t *testing.T) { testChannelTimeTick(t, time.Millisecond*500, time.Second) }

运行结果:

=== RUN   TestChannelTimeTick_1
tick
tick
tick
tick
tick
tick
tick
tick
tick
--- PASS: TestChannelTimeTick_1 (1.00s)
=== RUN   TestChannelTimeTick_2
tick
--- PASS: TestChannelTimeTick_2 (1.00s)
PASS
ok      command-line-arguments  3.017s

参考文档