由多个goroutine中获取第一个错误信息出发的CAS学习 Link to heading

此前我对于原子操作用的不是很多。按照之前看来的经验总结,Go中写高并发程序一般还是从逻辑上来避免加锁,毕竟原子操作写起来难度很大,而且不实际测试一下很容易写错。 不过如果能用上原子操作肯定是最好的。昨天工作的时候正好碰上了一个能用到CAS的使用场景,以此为契机学习并使用Go中的CAS。

从问题开始的CAS Link to heading

CAS(Compare And Swap)是最基础的原子操作之一,当年上操作系统就有提过。 遇到的这个问题其实本来是不用CAS的。 具体来说,有多个goroutine会在循环中被逐个启动,每个goroutine都可能会返回一个error。如果这些goroutine中的error至少有一个非空,则需要退出返回这个error并重新执行。 该代码原来的写法存在bug,在昨天写新代码的时候想到了这篇文章。

如下的写法是最容易想到的。其实这么写也是符合业务规范的(也被用了进去),只是这样的写法会造成数据竞争,最终的error值会为所有通过if判定条件中最后一个修改err的goroutine所对应的值。 问题在于,如果我希望获取到第一个产生的非空的error的值,应该怎么做?显然,此时程序不能发生数据竞争。

func test1() {
	// 存在数据竞争
	const num = 10
	var err error = nil
	wg := &sync.WaitGroup{}
	for i := 0; i < num; i++ {
		wg.Add(1)
		go func() { // 该goroutine存在数据竞争,读写冲突(读前写)
			defer wg.Done()
			innerErr := getErr()
			if innerErr != nil && err == nil { // 读的位置
				fmt.Println("err:", innerErr)
				err = innerErr // 竞争位置,之前写的地方
			}
		}()
	}
	wg.Wait()
	fmt.Println(err)
}

如果error为基本类型的话,这就是一个经典的CAS场景。Go中提供了unsafe.Pointer的CAS,因此可以用以下代码。 不过这么写的问题在于unsafe.Pointer的转换是存在一定的代价的,而且这么写感觉很奇怪。

func test2() {
	// 直接CAS
	const num = 10
	var err error = nil
	errPointer := unsafe.Pointer(&err)
	wg := &sync.WaitGroup{}
	for i := 0; i < num; i++ {
		wg.Add(1)
		go func() { // 没有数据竞争
			defer wg.Done()
			innerErr := getErr()
			if innerErr != nil {
				fmt.Println("err:", innerErr)
				innerErrPointer := unsafe.Pointer(&innerErr)
				atomic.CompareAndSwapPointer(&errPointer, unsafe.Pointer(&err), innerErrPointer)
			}
		}()
	}
	wg.Wait()
	errResult := (*error)(errPointer)
	fmt.Println(*errResult)
}

Go的CAS操作函数是有返回值的,如atomic pkg所示,CAS操作原子等价于

if *addr == old {
	*addr = new
	return true
}
return false

这也是函数声明中的swapped返回值func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

因此,可以借助返回值来实现类似的逻辑。该逻辑使用一个标记位来实现对err访问的原子并发控制。

func test3() {
	// 不存在数据竞争
	const num = 10
	var isErrSet uint32 = 0
	var err error = nil
	wg := &sync.WaitGroup{}
	for i := 0; i < num; i++ {
		wg.Add(1)
		go func() { // 没有数据竞争
			defer wg.Done()
			innerErr := getErr()
			if innerErr != nil { // 看上去像是可以写上 isErrSet == 0,但如果加上的话就会发生数据竞争
				fmt.Println("err:", innerErr)
				if atomic.CompareAndSwapUint32(&isErrSet, 0, 1) {
					err = innerErr // 可能得竞争位置,previous write的地方
				}
			}
		}()
	}
	wg.Wait()
	fmt.Println(err)
}

其他做法 Link to heading

这个地方CAS倒也并不是最佳的使用范例,只是可以用而已。毕竟CAS真的挺容易写错的……

一个是可以选择使用sync.Once,这个函数可以保证once.Do中的函数只执行一次。

func test4() {
	// 不存在数据竞争
	const num = 10
	once := new(sync.Once)
	var err error = nil
	wg := new(sync.WaitGroup)
	for i := 0; i < num; i++ {
		wg.Add(1)
		go func() { // 没有数据竞争
			defer wg.Done()
			innerErr := getErr()
			if innerErr != nil { // 尽管看上去很像是要写 isErrSet == 0,但实际上不应该写。如果加上的话就会发生数据竞争
				fmt.Println("err:", innerErr)
				once.Do(func() {
					err = innerErr
				})
			}
		}()
	}
	wg.Wait()
	fmt.Println(err)
}

另一个可选择的方法是使用errgroup。该方法的问题是操作性会比较低,对于EOF等非nil但是又可能是正常的错误可能会造成非预期的结果,把真正需要的err给漏掉。届时可能还是需要自行实现。

func test5() {
	// 不存在数据竞争
	const num = 10
	g, _ := errgroup.WithContext(context.Background())
	var err error = nil
	for i := 0; i < num; i++ {
		g.Go(func() error {
			innerErr := getErr()
			if innerErr != nil {
				fmt.Println("err:", innerErr)
			}
			return innerErr
		})
	}
	err = g.Wait()
	fmt.Println(err)
}

CAS操作的原理 Link to heading

参考此文的解析,CAS操作是Go汇编中两个命令通过加锁实现的。 CPU有对应的CAS指令,不过看Go内使用了Lock的汇编命令。后续需要进一步学习。

当多个线程同时使用CAS操作一个变量时,只会有一个胜出。如果是互斥锁,则失败线程会休眠。而CAS操作下线程仅会被告知失败,并会不断自旋(忙等待)。

CAS底层原理的演进见此文,此处不再赘述。

CAS的问题 Link to heading

  1. 循环时间太长:共享资源竞争比较激烈的时候,每个goroutine会容易陷入自旋状态,难以修改对应的值。竞争比较激烈的时候用互斥锁效果好一些。
    1. 虽然也有说法是,互斥锁的耗时比较久,如果锁的代码较短的话加锁的耗时可能比代码执行的耗时还高。
  2. 只能保证一个共享变量的原子操作
    1. 如果是多个共享变量应该使用锁,或者将多个变量变为一个变量(比如放在一个对象,然后对对象的地址使用CAS)
  3. 无法解决ABA问题。这个也是无锁结构常见的问题
    1. 进程P1读取值A;
    2. P1被挂起(时间片耗尽、中断等),进程P2开始执行;
    3. P2修改数值A为数值B,然后又修改回A;s
    4. P1被唤醒,比较后发现数值A没有变化,程序继续执行。
  4. CAS造成Cache一致性流量过大。详见此文
    1. Cache一致性流量:对称多处理器需要保证Cache一致,CAS操作会经常导致其中某个CPU中缓存的值发生变化,使得其他CPU缓存中对应位置的值失效,从而需要通过总线从内存中加载该地址最新的值。这个通过总线来回通信称为”Cache一致性流量“
    2. 如果有很多线程共享同一个对象,CAS操作成功时会引起总线风暴,对L1缓存的同步需求增加
    3. 不过这个好像也没什么文章提到,可能是关键词检索的问题,之后再看一下。

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=31u7nlc3nvuo0