目录

Go 基础知识与框架体系 系列十:Golang 并发包 sync 系列

这篇文章总结了 Golang 的sync包,包括sync.Mapsync.Oncesync.Pool、其中sync.Mutex的底层通过semaphore机制实现等等。

Go语言里对同步的支持主要有五类应用场景:

  1. 资源独占:当多个线程依赖同一份资源(比如数据),需要同时读/写同一个内存地址时,runtime需要保证只有一个修改这份数据,并且保证该修改对其他线程可见。锁和变量的原子操作为此而设计
  2. 生产者-消费者:在生产者-消费者模型中,消费者依赖生产者产出数据。 channel(管道)为此而设计
  3. 懒加载:一个资源,当且仅当第一次执行一个操作时,该操作执行过程中其他的同类操作都会被阻塞,直到该操作完成。sync.Once为此而设计
  4. fork-join:一个任务首先创建出$N$个子任务,$N$个子任务全部执行完成以后,主任务搜集结果,执行后续操作。sync.WaitGroup为此而设计
  5. 条件变量:条件变量是一个同步原语,可以同时阻塞多个线程,直到另一个线程 1) 修改了条件; 2)通知一个(或所有)等待的线程。sync.Cond为此而设计
注意
注意:这里当我说"线程"时,了解Go的同学可以自动映射到 “goroutine”(协程)。

1 sync.Map

1. sync.Map 用法

sync.Map是一个线程安全的map结构,一般用于多读少写的并发操作,下图是sync.Map的数据结构:

/syncMap.png
图1:sync.Map的数据结构

代码结构是:

1
2
3
4
5
6
type Map struct {
	mu Mutex
	read atomic.Value // readOnly
	dirty map[interface{}]*entry
	misses int
}
  • muMap的互斥锁用于对并发操作进行加锁保护
  • read是用于存储只读内容的,可以提供高并发的读操作
  • dirty是一个原始的map结构体,对dirty的操作需要加锁,dirty包涵了全量的数据,在读数据的时候会先读取readread读取不到再读dirty
  • missesread读取失败的次数,当多次读取失败后 misses 累计特定值,dirty就会升级成read

sync.Map这里采用的策略类似数据库常用的”读写分离”,技术都是相通的。具体例子如下:

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
	var value sync.Map
	// 写入
	value.Store("your name", "shi")
	value.Store("her name", "kanon")
	// 读取
	name, ok := value.Load("your name")
	if !ok {
		println("can't find name")
	}
	fmt.Println(name)
	// 遍历
	value.Range(func(ki, vi interface{}) bool {
		k, v := ki.(string), vi.(string)
		fmt.Println(k, v)
		return true
	})
	// 删除
	value.Delete("your name")
	// 读取,如果不存在则写入
	activename, loaded := value.LoadOrStore("his name", "baba")
	fmt.Println(activename.(string), loaded)
}

2. 原理结构

Load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
  // 首先从只读字段读取内容
	read, _ := m.read.Load().(readOnly)
  e, ok := read.m[key]
  // 如果没读到,并且dirty有read没有数据则从dirty中读取
	if !ok && read.amended {
		m.mu.Lock()
		// 在从dirty读取前需要加锁后再做一次验证,防止期间read突然有数据,也就是double check
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
      // 将此次记录记录添加到miss中,可以看到这里没对dirty的取值做判断,也就是说不管是否
      // 取到miss都会添加一次
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

Load方法讲解:首先从只读字段read中读取键值的内容,如果没读到,并且amendedtrue(dirtyread没有数据)则尝试从dirty中读取,不过这里要做 double check, 然后将此次缓存穿透记录一次到miss字段。

Store

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (m *Map) Store(key, value interface{}) {
  // 存储之前先从只读字段读取要存储的值,如果存在,则用CAS的方式将新的值存储进去
  read, _ := m.read.Load().(readOnly)
  // tryStore 会检查dirty的keys值是否已经删除,
  // 如果没有删除标记,则直接采用CAS方式存储entry
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

  m.mu.Lock()
  // double check
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
    // double check 如果read存在,调用 unexpungeLocked 将 expunged 设置为 nil,
    // 然后更新dirty,expunged 表示dirty中记录的删除标识(read没同步),由于有新的值存储需要
    // 将删除标识更新。
		if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
    // 如果read中没有对应的键,从dirty中有则直接更新dirty中的键
		e.storeLocked(&value)
	} else {
    // dirty 和 read 都不存在这个键的情况
		if !read.amended {
      // amended为true标识dirty包含read没有的key,由于dirty是最全的数据,amend为false只有两种
      // 情况,一种就是 dirty 的键值等于 read 的键值,一种是dirty为空的时候,所以这里只有可能是
			// 第二种,也就是dirty为空,因此再store 之前先判断一下 dirty map 是否为空,如果为空,就把 read map 浅拷贝一次。
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
    }
    // 如果dirty数据和read的key不同步数据,直接将值写入dirty
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

Store方法讲解:存储之前先从只读字段read中读取要存储的值,在read中存在键值对的时候,则用 CAS 的方式将新的值存储进去,如果不存在则加锁做个 double check,将新数据写入dirty中。如果dirtyread中都没数据,dirtyread的键值不同步,则将数据直接写入dirty, 如果dirty键值数据和read一样,同时dirtynil,将read浅拷贝一份到dirty,为后面赋值可以同时写入dirtyread

Delete

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *Map) Delete(key interface{}) {
	m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete()
	}
	return nil, false
}

func (e *entry) delete() (value interface{}, ok bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return nil, false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return *(*interface{})(p), true
		}
	}
}

Delete方法讲解:sync.Map的 Delete 方法本质是用的读取和删除,也就是先读取到数据再对数据进行删除,读的方法和 Load 的方法是一样的。

Range

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Range(f func(key, value interface{}) bool) {
  // 如果amend为ture,说明dirty包含了read所有的key,将dirty提升为read,
  // 并将dirty设置为nil,之后用Store存储新的值的时候再拷贝回来
  // 最后对read进行遍历即可
	read, _ := m.read.Load().(readOnly)
	if read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

Range原理讲解:Range 本质是通过遍历只读字段read得到,为了让只读字段包含所有数据,当dirtyread不相等的时候,将dirty升级为read, 最后再对read进行遍历即可。

2 sync.Pool

1. sync.Pool 用法

sync.Pool是一个用来缓存大量重复对象,减少大量对象创建给GC压力,是sync异步包中很重要的一种数据结构,看其基本数据结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type Pool struct {
  // noCopy 表示不支持值拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错
	noCopy noCopy

  // [P]poolLocal,表示每个local的P池
	local     unsafe.Pointer
  // local的长度
	localSize uintptr

  // 也是[P]poolLocal,表示上一个生命周期的local
	victim     unsafe.Pointer
  // victim的长度
	victimSize uintptr

  // 用于创建新对象方法,get获取不到就会调用创建一个新对象,一般由用户传入
	New func() interface{}
}

/syncPool.png
图2:sync.Pool的数据结构

sync.Pool用法有三种方法,如下:

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//初始化pool对象
var pool sync.Pool

type peter struct {
	num int
}

// 创建新对象创建方法
func initPool() {
	pool = sync.Pool{
		New: func() interface{} {
			return &peter{num: rand.Int()}
		},
	}
}

func main() {
  initPool()
  // 从pool对象池中取对象
	p1 := pool.Get().(*peter)
	fmt.Println("p1", p1.num)

  // 将对象放入pool对象池
	pool.Put(p1)

	p2 := pool.Get().(*peter)
  fmt.Println("p2", p2.num)
}

首先,需要初始化Pool,唯一需要的就是设置好New函数。当调用Get方法时,如果池子里缓存了对象,就直接返回缓存的对象。如果没有存货,则调用New函数创建一个新的对象。

另外,我们发现Get方法取出来的对象和上次Put进去的对象实际上是同一个,Pool没有做任何“清空”的处理。但我们不应当对此有任何假设,因为在实际的并发使用场景中,无法保证这种顺序,最好的做法是在Put前,将对象清空。

2. 单元测试

为了测试Get/Put的功能。我们来看下TestPoolNew

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func TestPoolNew(t *testing.T) {
    // disable GC so we can control when it happens.
    defer debug.SetGCPercent(debug.SetGCPercent(-1))

    i := 0
    p := Pool{
        New: func() interface{} {
            i++
            return i
        },
    }
    if v := p.Get(); v != 1 {
        t.Fatalf("got %v; want 1", v)
    }
    if v := p.Get(); v != 2 {
        t.Fatalf("got %v; want 2", v)
    }

    // Make sure that the goroutine doesn't migrate to another P
    // between Put and Get calls.
    Runtime_procPin()
    p.Put(42)
    if v := p.Get(); v != 42 {
        t.Fatalf("got %v; want 42", v)
    }
    Runtime_procUnpin()

    if v := p.Get(); v != 3 {
        t.Fatalf("got %v; want 3", v)
    }
}
问题
首先设置了GC=-1,作用就是停止GC。函数都跑完了,那为啥要用defer

注意到,debug.SetGCPercent这个函数被调用了两次,而且这个函数返回的是上一次GC的值。因此,defer在这里的用途是还原到调用此函数之前的GC设置,也就是恢复现场。

  • 调置了PoolNew函数:直接返回一个int,变且每次调用New,都会自增 1。然后,连续调用了两次Get函数,因为这个时候Pool里没有缓存的对象,因此每次都会调用New创建一个,所以第一次返回 1,第二次返回 2。
  • 调用Runtime_procPin()防止 goroutine 被强占,目的是保护接下来的一次PutGet操作,使得它们操作的对象都是同一个 P 的“池子”。并且,这次调用Get的时候并没有调用New,因为之前有一次Put的操作。
  • 再次调用Get操作,因为没有“存货”,因此还是会再次调用New创建一个对象。

其他TestPool函数:

  • TestPoolGCTestPoolRelease则主要测试GCPool里对象的影响。这里用了一个函数,用于计数有多少对象会被GC回收:
1
2
3
runtime.SetFinalizer(v, func(vv *string) {
    atomic.AddUint32(&fin, 1)
})
  • TestPoolStress从名字看,主要是想测一下“压力”,具体操作就是起了 10 个 goroutine 不断地向PoolPut对象,然后又Get对象,看是否会出错。
  • TestPoolDequeueTestPoolChain,都调用了testPoolDequeue,这是具体干活的。它需要传入一个PoolDequeue接口:
1
2
3
4
5
6
// poolDequeue testing.
type PoolDequeue interface {
    PushHead(val interface{}) bool
    PopHead() (interface{}, bool)
    PopTail() (interface{}, bool)
}

PoolDequeue是一个双端队列,可以从头部入队元素,从头部和尾部出队元素。调用函数时,前者传入NewPoolDequeue(16),后者传入NewPoolChain(),底层其实都是poolDequeue这个结构体。

具体testPoolDequeue工作过程如下:

/testPoolDequeue.png
图3:testPoolDequeue工作流程

总共起了 10 个 goroutine:1 个生产者,9 个消费者。生产者不断地从队列头pushHead元素到双端队列里去,并且每 push 10 次,就popHead一次;消费者则一直从队列尾取元素。不论是从队列头还是从队列尾取元素,都会在map里做标记,最后检验每个元素是不是只被取出过一次。

3. 原理结构

Pool没有缓存的对象时,调用New方法生成一个新的对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type poolLocal struct {
    poolLocalInternal

    // 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
    // 每个缓存行具有 64 bytes,即 512 bit
    // 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
    // 伪共享,仅占位用,防止在 cache line 上分配多个 poolLocalInternal
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
    // P 的私有缓存区,使用时无需要加锁
    private interface{}
    // 公共缓存区。本地 P 可以 pushHead/popHead;其他 P 则只能 popTail
    shared  poolChain
}

如果没有pad字段,那么当需要访问 0 号索引的poolLocal时,CPU 同时会把 0 号和 1 号索引同时加载到 cpu cache。在只修改 0 号索引的情况下,会让 1 号索引的poolLocal失效。这样,当其他线程想要读取 1 号索引时,发生 cache miss,还得重新再加载,对性能有损。增加一个pad,补齐缓存行,让相关的字段能独立地加载到缓存行就不会出现 false sharding 了。

关于poolChain结构体,是双端队列的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type poolChain struct {
    // 只有生产者会 push to,不用加锁
    head *poolChainElt

    // 读写需要原子控制。 pop from
    tail *poolChainElt
}

type poolChainElt struct {
    poolDequeue

    // next 被 producer 写,consumer 读。所以只会从 nil 变成 non-nil
    // prev 被 consumer 写,producer 读。所以只会从 non-nil 变成 nil
    next, prev *poolChainElt
}

type poolDequeue struct {
    // The head index is stored in the most-significant bits so
    // that we can atomically add to it and the overflow is
    // harmless.
    // headTail 包含一个 32 位的 head 和一个 32 位的 tail 指针。这两个值都和 len(vals)-1 取模过。
    // tail 是队列中最老的数据,head 指向下一个将要填充的 slot
    // slots 的有效范围是 [tail, head),由 consumers 持有。
    headTail uint64

    // vals 是一个存储 interface{} 的环形队列,它的 size 必须是 2 的幂
    // 如果 slot 为空,则 vals[i].typ 为空;否则,非空。
    // 一个 slot 在这时宣告无效:tail 不指向它了,vals[i].typ 为 nil
    // 由 consumer 设置成 nil,由 producer 读
    vals []eface
}
  • poolDequeue 被实现为单生产者、多消费者的固定大小的无锁(atomic实现) Ring式队列(底层存储使用数组,使用两个指针标记head、tail)。生产者可以从head插入、head删除,而消费者仅可从tail删除。
  • headTail指向队列的头和尾,通过位运算将headtail存入headTail变量中。

对于双端队列的理解:

/双端队列.png
图4:双端队列

我们看到Pool并没有直接使用poolDequeue,原因是它的大小是固定的,而Pool的大小是没有限制的。因此,在poolDequeue之上包装了一下,变成了一个poolChainElt的双向链表,可以动态增长。(摘录于segmentFault)

3. semaphore

1. 数据结构

sema.go中定义了一个全局变量,semtable数组。大小为251,元素为一个匿名结构体。这里为了避免伪共享问题做了一下内存填充。

1
2
3
4
5
6
7
// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
 root semaRoot
 pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

每个元素持有的semaRoot为这个数据结构的核心。

1
2
3
4
5
6
7
// 为sync.Mutex准备的异步信号量
// golang.org/issue/17953 可以查看引入二级列表之前性能较差的程序示例test/locklinear.go
type semaRoot struct {
 lock  mutex
 treap *sudog // 平衡树的根节点
 nwait uint32 // Number of waiters. Read w/o the lock.
}

semaRoot的结构看上去并不复杂,每个semaRoot持有一个具有不同地址(sudog.elem)sudog平衡树,每个sudog都可以通过s.waitlink依次指向一个相同地址等待的sudog列表, 在具有相同等待地址的sudog内部列表上的操作时间复杂度都是$O(1)$。顶层semaRoot列表的扫描为$O(\log n)$,其中$n$是阻止goroutines的不同信号量地址的数量。

问题
既然节点都是sudog,那它是如何定义的?
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type sudog struct {
 g *g
 next *sudog
 prev *sudog
 elem unsafe.Pointer //数据元素 (可能指向栈)
 // 下面的字段不会并发访问
 // 对于channels, waitlink 只被g访问
  // 对于semaphores, 所有自动(包括上面的)只有获取semaRoot的锁才能被访问
 acquiretime int64
 releasetime int64
 ticket      uint32
  //isSelect表示g正在参与一个select,因此必须对g.selectDone进行CAS才能赢得唤醒竞争
 isSelect bool
  //success表示channel c上的通信是否成功。如果goroutine因为在通道c上传递了一个值而被唤醒,则为true;
  //如果因为channel c关闭而被唤醒,则为false
 success bool
  
 parent   *sudog // semaRoot binary tree
 waitlink *sudog // g.waiting list or semaRoot
 waittail *sudog // semaRoot
 c        *hchan // channel
}

这里可能就涉及到了Go的运行时调度的知识:

注意
  • sudog是对goroutine的一种封装,比如当你使用channel时,goroutine在sending/receiving阻塞时是被封装成sudog放进阻塞队列进行等待。sudog是必需的,因为g和同步对象的关系是多对多的。一个g可以出现在许多等待列表上,因此一个g可能有很多个sudog。并且许多g可能正在等待同一个同步对象,因此一个对象可能有许多sudog
  • sudog是从一个特殊的pool中分配。使用acquireSudogreleaseSudog来分配和释放他们。

其中的next、prev、parent字段构成了平衡树,waitlinkwaittail构成了相同信号量地址的链表结构。

/sudog.png
图5:sudog涉及的链表结构

关于源码分析,我会在另一篇文章提到。

4 sync.Once

1. sync.Once 用法

sync.Once是 Golang package 中使方法只执行一次的对象实现,作用与init函数类似。但也有所不同。

  • init函数是在文件包首次被加载的时候执行,且只执行一次
  • sync.Once 是在代码运行中需要的时候执行,且只执行一次

当一个函数不希望程序在一开始的时候就被执行的时候,我们可以使用sync.Once

例如:

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"sync"
)

func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}

# Output:
Only once

在多数情况下,sync.Once被用于控制变量的初始化,这个变量的读写通常遵循单例模式,满足这三个条件:

  1. 当且仅当第一次读某个变量时,进行初始化(写操作)
  2. 变量被初始化过程中,所有读都被阻塞(读操作;当变量初始化完成后,读操作继续进行
  3. 变量仅初始化一次,初始化完成后驻留在内存里

需要注意的点:

注意
  • Once常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源,或者在测试的时候初始化一次测试资源。
  • sync.Once只暴露了一个方法Do,你可以多次调用Do方法,但是只有第一次调用Do方法时f参数才会执行,这里的f是一个无参数无返回值的函数。

2. 原理结构

Once的结构体:

1
2
3
4
type Once struct {
   done uint32 // 初始值为0表示还未执行过,1表示已经执行过
   m    Mutex 
}

其中,done成员变量:

  • 1 表示资源未初始化,需要进一步初始化
  • 0 表示资源已初始化,无需初始化,直接返回即可

m成员变量:

  • 为了防止多个goroutine调用doSlow()初始化资源时,造成资源多次初始化,因此采用Mutex锁机制来保证有且仅初始化一次

Once所拥有的方法DodoSlow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (o *Once) Do(f func()) {
   // 判断done是否为0,若为0,表示未执行过,调用doSlow()方法初始化
   if atomic.LoadUint32(&o.done) == 0 {
      // Outlined slow-path to allow inlining of the fast-path.
      o.doSlow(f)
   }
}


// 加载资源
func (o *Once) doSlow(f func()) {
   o.m.Lock()
   defer o.m.Unlock()
   // 采用双重检测机制 加锁判断done是否为零
   if o.done == 0 {
      // 执行完f()函数后,将done值设置为1
      defer atomic.StoreUint32(&o.done, 1)
      // 执行传入的f()函数
      f()
   }

调用Do函数时,首先判断done值是否为0,若为1,表示传入的匿名函数f()已执行过,无需再次执行;若为0,表示传入的匿名函数f()还未执行过,则调用doSlow()函数进行初始化。

doSlow()函数中,若并发的goroutine进入该函数中,为了保证仅有一个goroutine执行f()匿名函数。为此,需要加互斥锁保证只有一个goroutine进行初始化,同时采用了双检查的机制(double-checking),再次判断o.done是否为0,如果为0,则是第一次执行,执行完毕后,就将o.done设置为 1,然后释放锁。

即使此时有多个 goroutine 同时进入了doSlow方法,因为双检查的机制,后续的 goroutine 会看到o.done的值为 1,也不会再次执行f

这样既保证了并发的 goroutine 会等待f完成,而且还不会多次执行f