临界区
临界区是指访问共享资源的代码段,这些资源无法被多个线程或协程同时安全地访问。若多个并发执行单元同时进入临界区操作共享资源,则可能发生竞态条件,导致数据竞争和不可预测的行为。
我们可以使用互斥锁,限定临界区只能同时由一个线程持有。当临界区由一个线程持有的时候,其它线程如果想进入这个临界区,就会返回失败,或者是等待。直到持有的线程退出临界区,这些等待线程中的某一个才有机会接着持有这个临界区。
锁的分类
在并发编程中,从锁的逻辑思想上来看,分为悲观锁和乐观锁两种类型,这两种锁代表了在并发控制中两种截然不同的哲学,核心在于处理冲突的时机。
悲观锁
总是假设最坏情况会发生。在访问数据之前就认为其他线程很可能也会修改这份数据。因此,在读取数据时就先加锁,确保在自己操作期间,任何其他线程都无法修改这份数据,直到当前操作完成并释放锁。
具体子类型有以下几种:
互斥锁
最基本的锁类型,同一时间只允许一个线程持有锁。其他尝试获取锁的线程会被阻塞,进入锁等待或直接失败,同时线程会释放 CPU 给其他线程。互斥锁适合写操作频繁、冲突概率高的场景,能有效避免脏写。
自旋锁
与互斥锁相同,同一时间只允许一个线程持有锁;区别在与线程在未获取锁时进入忙等待,持续循环检查而非立即阻塞。适用于锁持有时间短的场景,避免了线程阻塞和唤醒过程中的内核操作和上下文切换带来的开销成本。
读写锁
将锁分为读锁和写锁。读锁只要没有写锁存在,允许多个线程同时持有读锁。写锁就是互斥锁,一旦一个线程持有写锁,其他任何线程都无法获取锁。读写锁提高了线程的并发度,适用于读多写少的场景。
乐观锁
总是假设最好的情况。认为在自己操作期间,其他线程不太可能修改同一份数据,因此,在读取数据时不加锁,直接进行操作。但在提交更新前,会检查数据是否被其他事务修改过。如果没被修改,则提交成功;如果被修改了,则放弃本次修改。
『乐观锁本质上不是真正的锁机制,不需要传统意义上的阻塞其他线程的互斥锁』。常见的乐观锁实现机制有以下几种:
版本号机制
为数据项添加一个版本号或时间戳字段。每次读取数据时,同时读取当前版本号;在更新数据前,先检查当前内存中的版本号是否与之前读取的版本号一致,不一致则说明数据已经被其他线程所修改,更新操作失败。可参考『版本号机制在业务中的应用』。
Compare And Swap(CAS)
CAS 是一种由现代 CPU 直接支持的原子操作,是现代并发编程的基石,尤其是实现无锁数据结构和非阻塞算法中经常被用到。它的核心功能是:在一条不可分割的指令中,完成读取、比较、写入这三个步骤,确保操作的原子性。
Go 语言的悲观锁
Go 语言在 sync 包中提供了 sync.Mutex 和 sync.RWMutex 两种锁,前者是互斥锁,后者是读写锁。Go 语言并没有直接提供自旋锁的类型,通常使用 sync/atomic 包中的 CAS 原子操作来模拟实现,留作后续补充。
在 Go 语言中,悲观锁是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。
sync.Mutex
互斥锁 Mutex 提供两个方法 Lock 和 Unlock,在进入临界区之前调用 Lock 方法,退出临界区的时候调用 Unlock 方法。当一个 Goroutine 通过调用 Lock 方法获得了这个锁的拥有权后,其它请求锁的 Goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放并且自己获取到了这个锁的拥有权。
代码示例
对临界区数据并发访问,加锁和不加锁示例,完整示例代码:
package main
import (
"fmt"
"sync"
"testing"
)
func TestConcurrencyAdd(t *testing.T) {
var count = 0
var wg sync.WaitGroup
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
for range 100000 {
count++
}
}()
}
wg.Wait()
fmt.Printf("Want %d, Got %d\n", 1000000, count)
}
func TestConcurrencyAddWithMutex(t *testing.T) {
var count = 0
var wg sync.WaitGroup
var mutex sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
for range 100000 {
mutex.Lock()
count++
mutex.Unlock()
}
}()
}
wg.Wait()
fmt.Printf("Want %d, Got %d\n", 1000000, count)
}
运行结果:
=== RUN TestConcurrencyAdd
Want 1000000, Got 403300
--- PASS: TestConcurrencyAdd (0.00s)
=== RUN TestConcurrencyAddWithMutex
Want 1000000, Got 1000000
--- PASS: TestConcurrencyAddWithMutex (0.11s)
PASS
ok golang-sync-example 0.112s
可见在并发条件下需要对竞态数据进行锁保护,否则会导致结果与预期不符。
互斥锁 sync.Mutex 有『正常』和『饥饿』两种模式。
-
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
-
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
sync.RWMutex
读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
代码示例
分别使用 Mutex 和 RWMutex 对数据进行读写操作示例,完整示例代码:
- 三种场景,分别使用 Lock 和 RWLock 测试,共 6 个用例。
- 每次测试读写操作合计 1000 次,例如读多写少场景,读 900 次,写 100 次。
- 使用 sync.WaitGroup 阻塞直到读写操作全部运行结束。
package main
import (
"sync"
"testing"
"time"
)
const SleepDuration = time.Microsecond
type ReadAndWrite interface {
Read()
Write()
}
type CounterWithMutex struct {
mu sync.Mutex
count int
}
func (s *CounterWithMutex) Read() {
s.mu.Lock()
defer s.mu.Unlock()
_ = s.count
time.Sleep(SleepDuration)
}
func (s *CounterWithMutex) Write() {
s.mu.Lock()
defer s.mu.Unlock()
s.count++
time.Sleep(SleepDuration)
}
type CounterWithRWMutex struct {
mu sync.RWMutex
count int
}
func (s *CounterWithRWMutex) Read() {
s.mu.RLock()
defer s.mu.RUnlock()
_ = s.count
time.Sleep(SleepDuration)
}
func (s *CounterWithRWMutex) Write() {
s.mu.Lock()
defer s.mu.Unlock()
s.count++
time.Sleep(SleepDuration)
}
func benchmark(b *testing.B, obj ReadAndWrite, rt, wt int) {
for range b.N {
var wg sync.WaitGroup
// read op
for range rt * 100 {
wg.Add(1)
go func() {
obj.Read()
wg.Done()
}()
}
// write op
for range wt * 100 {
wg.Add(1)
go func() {
obj.Write()
wg.Done()
}()
}
wg.Wait()
}
}
// NOTE: 读写操作次数为 9:1
func BenchmarkCounterWithMutex_RW_91(b *testing.B) { benchmark(b, &CounterWithMutex{}, 9, 1) }
func BenchmarkCounterWithRWMutex_RW_91(b *testing.B) { benchmark(b, &CounterWithRWMutex{}, 9, 1) }
// NOTE: 读写操作次数为 1:9
func BenchmarkCounterWithMutex_RW_19(b *testing.B) { benchmark(b, &CounterWithMutex{}, 1, 9) }
func BenchmarkCounterWithRWMutex_RW_19(b *testing.B) { benchmark(b, &CounterWithRWMutex{}, 1, 9) }
// NOTE: 读写操作次数为 5:5
func BenchmarkCounterWithMutex_RW_55(b *testing.B) { benchmark(b, &CounterWithMutex{}, 5, 5) }
func BenchmarkCounterWithRWMutex_RW_55(b *testing.B) { benchmark(b, &CounterWithRWMutex{}, 5, 5) }
在 SleepDuration=1nanosecond(1纳秒)时的运行结果:
goos: darwin
goarch: arm64
pkg: golang-sync-example
cpu: Apple M1
BenchmarkCounterWithMutex_RW_91-8 1707 671143 ns/op
BenchmarkCounterWithRWMutex_RW_91-8 3802 308129 ns/op
BenchmarkCounterWithMutex_RW_19-8 1836 667419 ns/op
BenchmarkCounterWithRWMutex_RW_19-8 1746 677809 ns/op
BenchmarkCounterWithMutex_RW_55-8 1770 684308 ns/op
BenchmarkCounterWithRWMutex_RW_55-8 2407 491069 ns/op
PASS
ok golang-sync-example 7.595s
在 SleepDuration=1nanosecond(10纳秒)时的运行结果:
goos: darwin
goarch: arm64
pkg: golang-sync-example
cpu: Apple M1
BenchmarkCounterWithMutex_RW_91-8 1372 917703 ns/op
BenchmarkCounterWithRWMutex_RW_91-8 3470 316473 ns/op
BenchmarkCounterWithMutex_RW_19-8 1740 666150 ns/op
BenchmarkCounterWithRWMutex_RW_19-8 1734 678055 ns/op
BenchmarkCounterWithMutex_RW_55-8 1736 662240 ns/op
BenchmarkCounterWithRWMutex_RW_55-8 2198 581304 ns/op
PASS
ok golang-sync-example 8.489s
在 SleepDuration=1microsecond(1微秒)时的运行结果:
goos: darwin
goarch: arm64
pkg: golang-sync-example
cpu: Apple M1
BenchmarkCounterWithMutex_RW_91-8 357 3154526 ns/op
BenchmarkCounterWithRWMutex_RW_91-8 1934 608672 ns/op
BenchmarkCounterWithMutex_RW_19-8 388 3068992 ns/op
BenchmarkCounterWithRWMutex_RW_19-8 421 2849951 ns/op
BenchmarkCounterWithMutex_RW_55-8 390 3041494 ns/op
BenchmarkCounterWithRWMutex_RW_55-8 680 1739662 ns/op
PASS
ok golang-sync-example 8.672s
在 SleepDuration=10microsecond(10微秒)时的运行结果:
goos: darwin
goarch: arm64
pkg: golang-sync-example
cpu: Apple M1
BenchmarkCounterWithMutex_RW_91-8 75 15430592 ns/op
BenchmarkCounterWithRWMutex_RW_91-8 658 1820549 ns/op
BenchmarkCounterWithMutex_RW_19-8 76 15071712 ns/op
BenchmarkCounterWithRWMutex_RW_19-8 82 13616595 ns/op
BenchmarkCounterWithMutex_RW_55-8 75 14976465 ns/op
BenchmarkCounterWithRWMutex_RW_55-8 154 7710706 ns/op
PASS
ok golang-sync-example 8.073s
在 SleepDuration=1millisecond(1毫秒)时的运行结果:
goos: darwin
goarch: arm64
pkg: golang-sync-example
cpu: Apple M1
BenchmarkCounterWithMutex_RW_91-8 1 1145668959 ns/op
BenchmarkCounterWithRWMutex_RW_91-8 9 116771389 ns/op
BenchmarkCounterWithMutex_RW_19-8 1 1141892125 ns/op
BenchmarkCounterWithRWMutex_RW_19-8 1 1034747916 ns/op
BenchmarkCounterWithMutex_RW_55-8 1 1145773833 ns/op
BenchmarkCounterWithRWMutex_RW_55-8 2 576389521 ns/op
PASS
ok golang-sync-example 7.490s
对比结果可以看出,在读大于写的情况下,RWMutex 整体性能要好于 Mutex。
在业务开发场景中,大多数服务的资源读写比例非常高,因为大多数的读请求之间不会相互影响,所以我们可以分离读写操作,以此来提高服务的性能。
相关问题
什么是 CAS 操作中的 ABA 问题?
ABA 问题是 CAS 实现乐观锁的一个经典陷阱。它描述了一种情况:一个变量的值在操作过程中从 A 变成了 B,然后又从 B 变回了 A,但 CAS 操作却误以为这个值从未被修改过,从而可能导致逻辑错误。
CAS 操作只检查目标内存位置的当前值是否等于预期值。如果相等,则更新为新值,它不关心这个值在读取预期值之后、执行 CAS 操作之前是否经历了其他变化。当一个线程 T1 读取共享变量值为 A 后,另一个线程 T2 可能将值修改为 B,然后由另一个线程 T3 又修改回 A。对于线程 T1 来说,它只看到了初始的 A 和当前的 A,中间发生的 A->B->A 过程对它来说是透明的。
解决 ABA 问题的核心思路是,在检查值是否变化的同时,增加对「状态」或「版本」的检查,确保即使值变回原样,其状态也已经不同了,使得 CAS 操作能检测到值背后的状态是否真的未变。