八股文-Golang Select和Channel

golang并发模型基于通信顺序进程(Communicating sequential processes,CSP)实现。goroutine和channel分别对应CSP中的实体和传递信息的媒介,goroutine之间会通过channel传递数据。

目前的channel收发操作也遵循先进先出的设计(1.6版本之后)
具体规则:

  • 先从channel读取数据的Goroutine先接收数据;
  • 先向channel发送数据的Goroutine先发送数据;

channel在实现上是个有锁队列

select可以通过default语句实现非阻塞的channel发送和接收,此时在代码中体现是chansendchanrecvblock参数。

1.select实现

1.1.select行为

  1. select不存在任何的case:直接block
  2. select只存在一个case:channel为nil时阻塞,否则行为回退至x, ok = <-c
  3. select存在两个case,其中一个是default:非阻塞发送/接收,若失败则走default语句
  4. select存在多个case:若存在多个ready的channel,随机执行,否则block到可以执行为止

1.2.实现原理

select语句在编译期间会被转换成OSELECT节点。每个OSELECT节点对应一组OCASE节点,若OCASE的执行条件是空,那就意味着这是一个default节点。
编译器在中间代码生成期间会根据selectcase的不同对控制语句进行优化,具体代码在gc.walkselectcases函数中

默认情况下,调用selectgo函数获取可执行的scase结构体。
selectgo函数首先会进行执行必要的初始化操作并决定处理case的两个顺序,轮询顺序pollOrder和加锁顺序lockOrder(两个数组,用于后续的case遍历):
轮询顺序:通过runtime.fastrandn函数引入随机性;
加锁顺序:按照channel的地址排序后确定加锁顺序;

scase结构体包含channel的指针,因此在轮询过程中,可以直接访问hchan结构体获取channel的发送接收状态,并进行接收或者发送(逻辑基本上是重写chansendchanrecv)。

1
2
3
4
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}

2.channel数据结构

sendq和recvq至少有一个为空(除非单个goroutine同时发送接收到一个缓冲channel)
对于带缓冲的channel,qcount > 0代表recvq为空,qcount < dataqsiz代表sendq为空.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
qcount uint // channel中元素个数
dataqsiz uint // 循环队列大小
buf unsafe.Pointer // 循环队列/缓冲区地址
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // 发送索引 0 <= sendx < dataqsiz
recvx uint // 接收索引 0 <= recvx < dataqsiz
recvq waitq // 阻塞的goroutine双向列表 (sudog里面有前后指针)
sendq waitq // 阻塞的goroutine双向列表

// lock保护hchan中的所有字段,以及channel阻塞的sudog部分字段
lock mutex
}

type waitq struct {
first *sudog
last *sudog
}

3.channel发送数据

数据发送通过chansend函数实现,分为三种情况:

  1. 存在recvq:当存在等待的接收者时,通过send直接将数据发送给阻塞的接收者;
  2. 发送至缓冲区:当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
  3. 发送阻塞:当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据;

3.1.存在recvq

如果recvq存在,且channel未关闭,chansend函数会从recvq取最先等待的goroutine并发送数据。
发送数据之后,send函数会调用goready(gp, skip+1),标记goroutine为_Grunnable,但不立即执行。

1
2
3
4
5
6
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

3.2.发送至缓冲区

通过chanbuf计算下一个位置的指针,然后调用typedmemmove将发送的数据复制至缓冲区中,并增加sendxqcount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 计算指针位置
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

3.3.发送阻塞

  1. 调用getg获取发送数据使用的goroutine;
  2. 执行acquireSudog获取sudog,并设置阻塞发送的相关信息,例如发送的channel、是否在select中和待发送数据的内存地址等;
  3. sudog实例加入发送等待队列,并设置goroutine的parkingOnChan属性;
  4. 调用goparkunlock将当前的goroutine陷入等待;
  5. 被调度器唤醒后,说明数据已经发送成功,清理goroutine属性,并释放sudog结构体;

4.channel接收数据

接收数据通过chanrecv函数实现,情况和发送类似:

  1. 存在sendq:当存在等待的发送者时,通过recv从阻塞的发送者或者缓冲区中获取数据;
  2. 从缓冲区接收:当缓冲区存在数据时,从channel的缓冲区中接收数据;
  3. 接收阻塞:当缓冲区中不存在数据时,等待其他goroutine向channel发送数据;

4.1.存在sendq

存在sendq和存在recvq的在接收发送阶段的行为会有点区别:发送时存在recvq,缓冲区一定不存在数据;接收时,sendq的存在表示缓冲区数据时满的。
这个区别导致recv函数的实现会比send更复杂。
recv函数区分了循环队列大小(dataqsiz)大小是否为0的情况。
无缓冲区(dataqsiz==0)时,直接从sendq的出队sudog中获取数据;否则,将缓冲区头数据出队,并将sendq中的对应元素入队(考虑到缓冲区队列此刻是满的,因此操作的是一个位置,重点是需要修改recvxsendx)。

接收之后,需要将sudog中对应的goroutine通过goready(gp, skip+1)唤醒(但不立刻调度)。

4.2.从缓冲区接收

通过chanbuf获取缓冲区指针之后,复制数据,然后修改recvxqcount,并解锁lock
此时不涉及发送相关的数据,因此不需要修改sendx

4.3.接收阻塞

和发送的阻塞操作一致,入队并阻塞,等待唤醒。