并发编程
# 上下文context
这东西是用于控制并发的。下面是一个简单的例子
func main() {
// 使用context.Background()返回一个空的context,作为我们的context树的根节点
// 然后我们使用context.WithCancel来创建一个可以取消的context
// 第一个返回的是context对象,第二个返回的是一个回调函数,使用这个回调函数,我们可以取消context
ctx, cancel := context.WithCancel(context.Background())
// 启动一个协程,传入我们的context
go watch(ctx, "【监控1】")
go watch(ctx, "【监控2】")
go watch(ctx, "【监控3】")
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
// 调用cancel函数,我们就可以发出取消的指令,这样我们的goroutine就会就会收到信号,结束函数
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
// 这个ctx.done() 就是在监听context,判断是否结束了
case <-ctx.Done():
fmt.Println(name, "监控退出,停止了...")
return
default:
fmt.Println(name, "goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
/**
【监控3】 goroutine监控中...
【监控1】 goroutine监控中...
【监控2】 goroutine监控中...
【监控1】 goroutine监控中...
【监控2】 goroutine监控中...
【监控3】 goroutine监控中...
【监控1】 goroutine监控中...
【监控2】 goroutine监控中...
【监控3】 goroutine监控中...
【监控1】 goroutine监控中...
【监控2】 goroutine监控中...
【监控3】 goroutine监控中...
【监控3】 goroutine监控中...
【监控1】 goroutine监控中...
【监控2】 goroutine监控中...
可以了,通知监控停止
【监控2】 监控退出,停止了...
【监控3】 监控退出,停止了...
【监控1】 监控退出,停止了...
**/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
这就是 Context 的控制能力,它就像一个控制器一样,按下开关后,所有基于这个 Context 或者衍生的子 Context 都会收到通知,这时就可以进行清理操作了,最终释放 goroutine,这就优雅的解决了 goroutine 启动后不可控的问题。
# 接口方法
context的结构体如下
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
2
3
4
5
6
7
8
9
下面简单介绍每个方法的作用
- Deadline 获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context 会自动发起取消请求;第二个返回值 ok==false 时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。
- Done方法 Done 方法返回一个只读的 chan,类型为 struct{},我们在 goroutine 中,如果该方法返回的 chan 可以读取,则意味着 parent context 已经发起了取消请求
- Err方法 Err 方法返回取消的错误原因,因为什么 Context 被取消。
- Value方法 Value 方法获取该 Context 上绑定的值,是一个键值对,所以要通过一个 Key 才可以获取对应的值,这个值一般是线程安全的。
# 创建子context
上面我们使用context.Background返回了一个空的context,那么我们如何产生子context呢,我们可以使用context包提供的with函数
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
2
3
4
这四个 With 函数,接收的都有一个 partent 参数,就是父 Context,我们要基于这个父 Context 创建出子 Context 的意思,这种方式可以理解为子 Context 对父 Context 的继承,也可以理解为基于父 Context 的衍生。
通过这些函数,就创建了一颗 Context 树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。
- WithCancel 函数传递一个父 Context 作为参数,返回子 Context,以及一个取消函数用来取消 Context。
- WithDeadline 函数,和 WithCancel 差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消 Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。
- WithTimeout 和 WithDeadline 基本上一样,这个表示是超时自动取消,是多少时间后自动取消 Context 的意思。
- WithValue 函数和取消 Context 无关,它是为了生成一个绑定了一个键值对数据的 Context,这个绑定的数据可以通过 Context.Value 方法访问到,后面我们会专门讲。
前三个函数都返回一个取消函数 CancelFunc,该函数可以 取消一个 Context,以及这个节点 Context下所有的所有的 Context, 不管有多少层级。
# context传递数据
var key string = "name"
func main() {
ctx, cancel := context.WithCancel(context.Background())
//附加值
valueCtx := context.WithValue(ctx, key, "【监控1】")
go watch(valueCtx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context) {
for {
select {
case <-ctx.Done():
//取出值
fmt.Println(ctx.Value(key), "监控退出,停止了...")
return
default:
//取出值
fmt.Println(ctx.Value(key), "goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
记住,使用 WithValue 传值,一般是必须的值,不要什么值都传递。
# 使用原则
- 不要把 Context 放在结构体中,要以参数的方式传递
- 以 Context 作为参数的函数方法,应该把 Context 作为第一个参数,放在第一位。
- 给一个函数方法传递 Context 的时候,不要传递 nil,如果不知道传递什么,就使用 context.TODO
- Context 的 Value 相关方法应该传递必须的数据,不要什么数据都使用这个传递
- Context 是线程安全的,可以放心的在多个 goroutine 中传递
参考 Go Context - 知乎 (zhihu.com) (opens new window)
# 同步原语与锁
# 基本原语
Go 语言在 sync
(opens new window) 包中提供了用于同步的一些基本原语,包括常见的 sync.Mutex
(opens new window)、sync.RWMutex
(opens new window)、sync.WaitGroup
(opens new window)、sync.Once
(opens new window) 和 sync.Cond
(opens new window)
上面这些都提供了一些较为基础的同步功能,多数情况下我们应该使用抽象层级更高的channel实现同步
# Mutex
Go 语言的 sync.Mutex
(opens new window) 由两个字段 state
和 sema
组成。其中 state
表示当前互斥锁的状态,而 sema
是用于控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
2
3
4
# 正常模式和饥饿模式
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且 它在队列的末尾或者它等待的时间少于 1ms ,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时
# 使用
使用 sync.Mutex.Lock
(opens new window) 加锁和 sync.Mutex.Unlock
(opens new window) 解锁。
# 自旋
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式才能进入自旋;
runtime.sync_runtime_canSpin
需要返回true
- 运行在多 CPU 的机器上;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
# 总结
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位
mutexLocked
加锁; - 如果互斥锁处于
mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE
指令消耗 CPU 时间等待锁的释放; - 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex
(opens new window) 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用
sync.Mutex.Unlock
(opens new window) 会直接抛出异常; - 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过
sync.runtime_Semrelease
(opens new window) 唤醒对应的 Goroutine;
# RWMutex
读写互斥锁 sync.RWMutex
(opens new window) 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
读 | 写 | |
---|---|---|
读 | Y | N |
写 | N | N |
- 写操作使用
sync.RWMutex.Lock
(opens new window) 和sync.RWMutex.Unlock
(opens new window) 方法; - 读操作使用
sync.RWMutex.RLock
(opens new window) 和sync.RWMutex.RUnlock
(opens new window) 方法;
虽然读写互斥锁 sync.RWMutex
(opens new window) 提供的功能比较复杂,但是因为它建立在 sync.Mutex
(opens new window) 上,所以实现会简单很多。我们总结一下读锁和写锁的关系:
- 调用
sync.RWMutex.Lock
尝试获取写锁时;- 每次
sync.RWMutex.RUnlock
(opens new window) 都会将readerCount
其减一,当它归零时该 Goroutine 会获得写锁; - 将
readerCount
减少rwmutexMaxReaders
个数以阻塞后续的读操作;
- 每次
- 调用
sync.RWMutex.Unlock
(opens new window) 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
简单使用的例子
import (
"fmt"
"sync"
)
func main() {
var l *sync.RWMutex
l = new(sync.RWMutex)
l.RUnlock() //1个RUnLock
fmt.Println("1")
l.RLock()
2
3
4
5
6
7
8
9
10
11
# WaitGroup
sync.WaitGroup
(opens new window) 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
// 把请求加入wg中
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
// 处理完一个请求就-1
defer wg.Done()
// res, err := service.call(r)
}(request)
}
// 等待所有请求处理完毕
wg.Wait()
2
3
4
5
6
7
8
9
10
11
12
13
通过对 sync.WaitGroup
(opens new window) 的分析和研究,我们能够得出以下结论:
sync.WaitGroup
(opens new window) 必须在sync.WaitGroup.Wait
(opens new window) 方法返回之后才能被重新使用;sync.WaitGroup.Done
(opens new window) 只是对sync.WaitGroup.Add
(opens new window) 方法的简单封装,我们可以向sync.WaitGroup.Add
(opens new window) 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine;- 可以同时有多个 Goroutine 等待当前
sync.WaitGroup
(opens new window) 计数器的归零,这些 Goroutine 会被同时唤醒;
# Once
Go 语言标准库中 sync.Once
(opens new window) 可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果:
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
$ go run main.go
only once
2
3
4
5
6
7
8
9
10
11
作为用于保证函数执行次数的 sync.Once
(opens new window) 结构体,它使用互斥锁和 sync/atomic
(opens new window) 包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,我们也需要注意以下的问题:
sync.Once.Do
(opens new window) 方法中传入的函数只会被执行一次,哪怕函数中发生了panic
;- 两次调用
sync.Once.Do
(opens new window) 方法传入不同的函数只会执行第一次调传入的函数;
# Cond
Go 语言标准库中还包含条件变量 sync.Cond
(opens new window),它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond
(opens new window) 结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法:
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
// 唤醒所有陷入等待的 Goroutine
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
// 等待特定条件的满足
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sync.Cond.Wait
(opens new window) 方法会将当前 Goroutine 陷入休眠状态,直到被唤醒sync.Cond.Signal
(opens new window) 和sync.Cond.Broadcast
(opens new window) 就是用来唤醒陷入休眠的 Goroutine 的方法,它们的实现有一些细微的差别:sync.Cond.Signal
(opens new window) 方法会唤醒队列最前面的 Goroutine;sync.Cond.Broadcast
(opens new window) 方法会唤醒队列中全部的 Goroutine;
总结
sync.Cond
(opens new window) 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {}
进行忙碌等待相比,sync.Cond
(opens new window) 能够让出处理器的使用权,提供 CPU 的利用率。使用时我们也需要注意以下问题:
sync.Cond.Wait
(opens new window) 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;sync.Cond.Signal
(opens new window) 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;sync.Cond.Broadcast
(opens new window) 会按照一定顺序广播通知等待的全部 Goroutine;
# 扩展原语
# ErrGroup
golang/sync/errgroup.Group
(opens new window) 为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,我们可以使用如下所示的方式并行获取网页的数据:
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
小结
golang/sync/errgroup.Group
(opens new window) 的实现没有涉及底层和运行时包中的 API,它只是对基本同步语义进行了封装以提供更加复杂的功能。我们在使用时也需要注意下面几个问题:
golang/sync/errgroup.Group
(opens new window) 在出现错误或者等待结束后会调用context.Context
(opens new window) 的cancel
方法同步取消信号;- 只有第一个出现的错误才会被返回,剩余的错误会被直接丢弃;
# Semaphore
信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。这个包就提供了带权重的信号量
golang/sync/semaphore.NewWeighted
(opens new window) 用于创建新的信号量;golang/sync/semaphore.Weighted.Acquire
(opens new window) 阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待;golang/sync/semaphore.Weighted.TryAcquire
(opens new window) 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回false
;golang/sync/semaphore.Weighted.Release
(opens new window) 用于释放指定权重的资源;
小结
带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的唯一一种信号量实现,在使用的过程中我们需要注意以下的几个问题:
golang/sync/semaphore.Weighted.Acquire
(opens new window) 和golang/sync/semaphore.Weighted.TryAcquire
(opens new window) 都可以用于获取资源,前者会阻塞地获取信号量,后者会非阻塞地获取信号量;golang/sync/semaphore.Weighted.Release
(opens new window) 方法会按照先进先出的顺序唤醒可以被唤醒的 Goroutine;- 如果一个 Goroutine 获取了较多地资源,由于
golang/sync/semaphore.Weighted.Release
(opens new window) 的释放策略可能会等待比较长的时间;
# SingleFlight
golang/sync/singleflight.Group
(opens new window) 是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
golang/sync/singleflight.Group.Do
(opens new window) — 同步等待的方法;golang/sync/singleflight.Group.DoChan
(opens new window) — 返回 Channel 异步等待的方法;
小结
当我们需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group
(opens new window) 来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:
golang/sync/singleflight.Group.Do
(opens new window) 和golang/sync/singleflight.Group.DoChan
(opens new window) 一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 Channel 接收函数的返回值;golang/sync/singleflight.Group.Forget
(opens new window) 可以通知golang/sync/singleflight.Group
(opens new window) 在持有的映射表中删除某个键,接下来对该键的调用就不会等待前面的函数返回了;- 一旦调用的函数返回了错误,所有在等待的 Goroutine 也都会接收到同样的错误;
# 定时器
Go 语言的计时器在并发编程起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了定时器、休眠等接口能够我们在 Go 语言程序中更好地处理过期和超时等问题。
更详细的参考:Go 语言并发编程与计时器 | Go 语言设计与实现 (draveness.me) (opens new window)
# Channel
Go 语言中最常见的、也是经常被人提及的设计模式就是: 不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存 这个就是channel的指导思想。
虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)1 (opens new window)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。下图两个goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信
channel存在3种状态:
- nil,未初始化的状态,只进行了声明,或者手动赋值为nil
- active,正常的channel,可读或者可写
- closed,已关闭,千万不要误认为关闭channel后,channel的值是nil
下面我们对channel的三种操作解析:
- 零值(nil)通道;
- 非零值但已关闭的通道;
- 非零值并且尚未关闭的通道。
操作 | 一个零值nil通道 | 一个非零值但已关闭的通道 | 一个非零值且尚未关闭的通道 |
---|---|---|---|
关闭 | 产生恐慌 | 产生恐慌 | 成功关闭 |
发送数据 | 永久阻塞 | 产生恐慌 | 阻塞或者成功发送 |
接收数据 | 永久阻塞 | 永不阻塞 | 阻塞或者成功接收 |
# 底层原理
在我们创建channel时,实际上底层会创建如下的结构
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
2
3
4
5
6
7
8
9
10
11
12
13
14
runtime.hchan
(opens new window) 结构体中的五个字段 qcount
、dataqsiz
、buf
、sendx
、recv
构建底层的循环队列:
qcount
— Channel 中的元素个数;dataqsiz
— Channel 中的循环队列的长度;buf
— Channel 的缓冲区数据指针;sendx
— Channel 的发送操作处理到的位置;recvx
— Channel 的接收操作处理到的位置;
除此之外,elemsize
和 elemtype
分别表示当前 Channel 能够收发的元素类型和大小;sendq
和 recvq
存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq
(opens new window) 表示,链表中所有的元素都是 runtime.sudog
(opens new window) 结构:
type waitq struct {
first *sudog
last *sudog
}
2
3
4
runtime.sudog
(opens new window) 表示一个在等待列表中的 Goroutine,该结构中存储了两个分别指向前后 runtime.sudog
(opens new window) 的指针以构成链表。
下图是Channel 中存在等待数据的 Goroutine 时,向 Channel 发送数据的过程:
如果有缓存区,就会按照下面这样
参考:Go 语言 Channel 实现原理精要 | Go 语言设计与实现 (draveness.me) (opens new window)
# 调度器
调度器用于管理和调度协程,这里主要涉及到G-M-P调度模型,示意图如下:
# 各个部分介绍
# G — 表示 Goroutine,它是一个待执行的任务;
Goroutine 是 Go 语言调度器中待执行的任务,它在运行时调度器中的地位与线程在操作系统中差不多,但是它占用了更小的内存空间,也降低了上下文切换的开销。
Goroutine 只存在于 Go 语言的运行时,它是 Go 语言在用户态提供的线程,作为一种粒度更细的资源调度单元,如果使用得当能够在高并发的场景下更高效地利用机器的 CPU。
Go程序对创建协程数量是没有限制对,一般上限和操作系统内存有关, 关键是看你在go携程中做什么事情, 例如你在 10000个携程中请求 http , 则会出现 too many files open 这样的错误, 你在10000个携程中批量fmt.println() 也会出现错误或崩溃。 因此:
因此, 简而言之, 崩溃对原因是对系统资源的占用,比如:http端口,打开文件数,内存等。 在程序中,无限个 死循环的 go runtine 还会造成一个bug, 那就是:后面创建的 go runtine 有可能因为内存限制原因不去执行。
# M — 表示操作系统的线程,它由操作系统的调度器调度和管理;
调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS
个活跃线程能够正常运行。默认情况下,这个参数为当前机器的核心数(也可以自己修改)。在默认情况下,一个四核机器会创建四个活跃的操作系统线程,每一个线程都对应一个运行时中的 runtime.m
(opens new window) 结构体。
在大多数情况下,我们都会使用 Go 的默认设置,也就是线程数等于 CPU 数,默认的设置不会频繁触发操作系统的线程调度和上下文切换,所有的调度都会发生在用户态,由 Go 语言调度器触发,能够减少很多额外开销。
M的结构体中会有一个 g0
,curg
。g0 是持有调度栈的 Goroutine,curg
是在当前线程上运行的用户 Goroutine,这也是操作系统线程唯一关心的两个 Goroutine。
g0 是一个运行时中比较特殊的 Goroutine,它会深度参与运行时的调度过程,包括 Goroutine 的创建、大内存分配和 CGO 函数的执行。在后面的小节中,我们会经常看到 g0 的身影
# P — 表示处理器,它可以被看做运行在线程上的本地调度器
调度器中的处理器 P 是线程和 Goroutine 的中间层,它能提供线程需要的上下文环境,也会负责调度线程上的等待队列,通过处理器 P 的调度,每一个内核线程都能够执行多个 Goroutine,它能在 Goroutine 进行一些 I/O 操作时及时让出计算资源,提高线程的利用率。
因为调度器在启动时就会创建 GOMAXPROCS
个处理器,所以 Go 语言程序的处理器数量一定会等于 GOMAXPROCS
,这些处理器会绑定到不同的内核线程上。
# 运行流程
新创建的Goroutine会先存 放在Global全局队列中 ,等待Go调度器进行调度,随后Goroutine被分配给其中的一个逻辑处理器P,并放到这个 逻辑处理器对应的Local本地运行队列 中,最终等待被逻辑处理器P执行即可。 在M与P绑定后, M会不断从P的Local队列中无锁地取出G ,并切换到G的堆栈执行,当P的Local队列中 没有G时,再从Global队列中获取一个G,当Global队列中也没有待运行的G时,则尝试从其它的P窃取部分(取一半)G来执行 相当于P之间的负载均衡。
图中灰色的那些goroutine并没有运行,而是处于ready的就绪态,正在等待被调度。P维护着这个队列(称之为runqueue)
当一个OS线程M0陷入阻塞时,P转而在运行M1,图中的M1可能是正被创建,或者从线程缓存中取出。
当MO返回时,它必须尝试取得一个P来运行goroutine,一般情况下,它会从其他的OS线程那里拿一个P过来, 如果没有拿到的话,它就把goroutine放在一个global runqueue
里,然后自己睡眠(放入线程缓存里)。所有的P也会周期性的检查global runqueue
并运行其中的goroutine
另一种情况是P所分配的任务G很快就执行完了(分配不均),这就导致了这个处理器P处于空闲的状态,但是此时其他的P还有任务,此时如果global runqueue没有任务G了,那么这个P就会从其他的P里偷取一些G来执行。
通常来说,如果P从其他的P那里要拿任务的话,一般就拿run queue
的一半,这就确保了每个OS线程都能充分的使用。
# 网络轮询器
# 系统监控
# 原子操作
atomic 提供的原子操作能够确保任一时刻只有一个goroutine对变量进行操作,善用 atomic 能够避免程序中出现大量的锁操作。
atomic常见操作有:
- 增减
- 载入
- 比较并交换
- 交换
- 存储
下面将分别介绍这些操作。
# 增减操作
atomic 包中提供了如下以Add为前缀的增减操作:
- func AddInt32(addr *int32, delta int32) (new int32) (opens new window)
- func AddInt64(addr *int64, delta int64) (new int64) (opens new window)
- func AddUint32(addr *uint32, delta uint32) (new uint32) (opens new window)
- func AddUint64(addr *uint64, delta uint64) (new uint64) (opens new window)
- func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) (opens new window)
需要注意的是,第一个参数必须是指针类型的值,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作。
func main() {
var opts int64 = 0
for i := 0; i < 50; i++ {
go func() {
// 注意第一个参数必须是地址
atomic.AddInt64(&opts, 3) //加操作
//atomic.AddInt64(&opts, -1) 减操作
time.Sleep(time.Millisecond)
}()
}
time.Sleep(time.Second)
fmt.Println("opts: ", atomic.LoadInt64(&opts))
// 最后输出的是150
}
2
3
4
5
6
7
8
9
10
11
12
13
14
上面的例子可以很清楚的看到,我们使用原子操作可以确保最后输出的值为150,如果我们不使用原子操作的话,那么最后的结果可能不是150
func main() {
var opts int64 = 0
for i := 0; i < 50; i++ {
go func() {
opts+=3
time.Sleep(time.Millisecond)
}()
}
time.Sleep(time.Second)
fmt.Println("opts: ", atomic.LoadInt64(&opts))
// 最后输出的是 不一定是150
}
2
3
4
5
6
7
8
9
10
11
12
# 载入操作
atomic 包中提供了如下以Load为前缀的增减操作:
- func LoadInt32(addr *int32) (val int32) (opens new window)
- func LoadInt64(addr *int64) (val int64) (opens new window)
- func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) (opens new window)
- func LoadUint32(addr *uint32) (val uint32) (opens new window)
- func LoadUint64(addr *uint64) (val uint64) (opens new window)
- func LoadUintptr(addr *uintptr) (val uintptr) (opens new window)
载入操作能够保证原子的读变量的值,当读取的时候,任何其他CPU操作都无法对该变量进行读写,其实现机制受到底层硬件的支持。见上述例子中的atomic.LoadInt64(&opts)
。
func main() {
var opts int64 = 0
for i := 0; i < 10; i++ {
go func() {
atomic.AddInt64(&opts,1)
fmt.Println(atomic.LoadInt64(&opts))
//fmt.Println(opts)
time.Sleep(time.Millisecond)
}()
}
time.Sleep(time.Second)
fmt.Println("opts: ", atomic.LoadInt64(&opts))
}
2
3
4
5
6
7
8
9
10
11
12
13
这种的使用比较简单如果要看效果的话,就比较麻烦了
# 比较并交换
该操作简称 CAS(Compare And Swap)。 这类操作的前缀为 CompareAndSwap
:
- func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) (opens new window)
- func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) (opens new window)
- func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) (opens new window)
- func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) (opens new window)
- func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) (opens new window)
该操作在进行交换前首先确保变量的值未被更改,即仍然保持参数 old
所记录的值,满足此前提下才进行交换操作。CAS的做法类似操作数据库时常见的乐观锁机制。
需要注意的是,当有大量的goroutine 对变量进行读写操作时,可能导致CAS操作无法成功,这时可以利用for循环多次尝试。
使用示例:
var value int64
func atomicAddOp(tmp int64) {
for {
oldValue := value
if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {
return
}
}
}
2
3
4
5
6
7
8
9
# 交换
此类操作的前缀为 Swap
:
- func SwapInt32(addr *int32, new int32) (old int32) (opens new window)
- func SwapInt64(addr *int64, new int64) (old int64) (opens new window)
- func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) (opens new window)
- func SwapUint32(addr *uint32, new uint32) (old uint32) (opens new window)
- func SwapUint64(addr *uint64, new uint64) (old uint64) (opens new window)
- func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) (opens new window)
相对于CAS,明显此类操作更为暴力直接,并不管变量的旧值是否被改变,直接赋予新值然后返回背替换的值。
# 存储
此类操作的前缀为 Store
:
- func StoreInt32(addr *int32, val int32) (opens new window)
- func StoreInt64(addr *int64, val int64) (opens new window)
- func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) (opens new window)
- func StoreUint32(addr *uint32, val uint32) (opens new window)
- func StoreUint64(addr *uint64, val uint64) (opens new window)
- func StoreUintptr(addr *uintptr, val uintptr) (opens new window)
此类操作确保了写变量的原子性,避免其他操作读到了修改变量过程中的脏数据。
参考:Go 原子操作 - 知乎 (zhihu.com) (opens new window)
# CAS
CAS算法(Compare And Swap),是原子操作的一种, CAS算法是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。
该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。
Go中的CAS操作是借用了CPU提供的原子性指令来实现。CAS操作修改共享变量时候不需要对共享变量加锁,而是通过类似乐观锁的方式进行检查,本质还是不断的占用CPU 资源换取加锁带来的开销(比如上下文切换开销)。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var (
counter int32 //计数器
wg sync.WaitGroup //信号量
)
func main() {
threadNum := 5
wg.Add(threadNum)
for i := 0; i < threadNum; i++ {
go incCounter(i)
}
wg.Wait()
}
func incCounter(index int) {
defer wg.Done()
spinNum := 0
for {
// 原子操作
old := counter
// cas本质上是比较和交换,我们可以先获取counter的数据,然后比如我们这里想进行+1的操作
// 此时我们就必须要把counter地址,counter的旧值,counter的新值进行替换
// CPU会检查counter的旧值,判断是否和内存的值一样,如果一样就会交换,否则就报错
// 我们这里就会不断进行循环修改
ok := atomic.CompareAndSwapInt32(&counter, old, old+1)
if ok {
break
} else {
spinNum++
}
}
fmt.Printf("thread,%d,spinnum,%d\n", index, spinNum)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
当主函数main首先创建了5个信号量,然后开启五个线程执行incCounter方法,incCounter内部执行, 使用cas操作递增counter的值,atomic.CompareAndSwapInt32
具有三个参数,第一个是变量的地址,第二个是变量当前值,第三个是要修改变量为多少,该函数如果发现传递的old值等于当前变量的值,则使用第三个变量替换变量的值并返回true,否则返回false。
这里之所以使用无限循环是因为在高并发下每个线程执行CAS并不是每次都成功,失败了的线程需要重写获取变量当前的值,然后重新执行CAS操作。读者可以把线程数改为10000或者更多就会发现输出thread,5329,spinnum,1
其中这个1就说明该线程尝试了两个CAS操作,第二次才成功。
因此呢, go中CAS操作可以有效的减少使用锁所带来的开销,但是需要注意在高并发下这是使用cpu资源做交换的
# CAS的问题
ABA问题 CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。
循环时间长开销大。CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销。
只能保证一个共享变量的原子操作。对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。
# goroutine
# goroutine状态迁移图
其中 Gidle 在Go调度器代码中并没有被真正被使用到,所以直接忽略
# Grunnable
当我们的协程处于下面这几种状态时会设置为这个状态
- 当我们的协程被创建时,就会置为这个状态,等待调度执行
- 当阻塞任务被唤醒时,也会处于这个状态,这个同样会放入任务队列中,等待调度
- 其他情况:比如从Grunning和Gsyscall状态变换到Grunnable,
总之,处于Grunnable的任务一定在某个任务队列中,随时等待被调度执行。
# Grunning
所有状态为Grunnable的任务都可能通过findrunnable函数被调度器(P&M)获取,进而通过execute函数将其状态切换到Grunning, 最后调用runtime·gogo加载其上下文并执行。
# Gsyscall
Go运行时为了保证高的并发性能,当会在任务执行OS系统调用前,先调用runtime·entersyscall函数将自己的状态置为Gsyscall——如果系统调用是阻塞式的或者执行过久,则将当前M与P分离——当系统调用返回后,执行线程调用runtime·exitsyscall尝试重新获取P,如果成功且当前任务没有被抢占,则将状态切回Grunning并继续执行;否则将状态置为Grunnable,等待再次被调度执行。
# Gwaiting
当一个任务需要的资源或运行条件不能被满足时,需要调用runtime·park函数进入该状态,之后除非等待条件满足,否则任务将一直处于等待状态不能执行。除了之前举过的channel的例子外,Go语言的定时器、网络IO操作都可能引起任务的阻塞。
# Gdead
最后,当一个任务执行结束后,会调用runtime·goexit结束自己的生命——将状态置为Gdead,并将结构体链到一个属于当前P的空闲G链表中,以备后续使用。
参考
# 协程OOM
就是内存耗尽的意思