本文源码版本基于 go1.21.13
# channel
Golang 中使用 CSP 中 channel 的概念。channel 是被单独创建并且可以在进程之间传递,它的通信模式类似于 boss-worker 模式,一个实体通过将消息发送到 channel 中,然后由监听这个 channel 的实体处理,两个实体之间是没有互相感知的,这个就实现实体之间的解耦。
# chan 结构
type hchan struct { | |
qcount uint //buffer 中已放入的元素个数 | |
dataqsiz uint // 用户构造 channel 时指定的 buf 大小 | |
buf unsafe.Pointer // buffer | |
elemsize uint16 //buffer 中每个元素的大小 | |
closed uint32 //channel 是否关闭,== 0 代表未 closed | |
elemtype *_type //channel 元素的类型信息 | |
sendx uint //buffer 中已发送的索引位置 send index | |
recvx uint //buffer 中已接收的索引位置 receive index | |
recvq waitq // 等待接收的 goroutine list of recv waiters | |
sendq waitq // 等待发送的 goroutine list of send waiters | |
lock mutex | |
} | |
type waitq struct { | |
first *sudog | |
last *sudog | |
} |
# 初始化
如果当前 channel 中不存在缓冲区,那么就只会为 hchan 分配一段内存空间;
如果当前 channel 中存储的类型不是指针类型,就会直接为当前的 Channel 和底层的数组分配一块连续的内存空间;
在默认情况下会单独为 hchan 和 buff 分配内存;
func makechan(t *chantype, size int) *hchan { | |
elem := t.Elem | |
// compiler checks this but be safe. | |
if elem.Size_ >= 1<<16 { | |
// 元素大小超过限制,抛出异常 | |
throw("makechan: invalid channel element type") | |
} | |
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { | |
// 确保合理的对齐 | |
throw("makechan: bad alignment") | |
} | |
// 计算内存大小,并检查是否溢出 | |
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) | |
if overflow || mem > maxAlloc-hchanSize || size < 0 { | |
panic(plainError("makechan: size out of range")) | |
} | |
// 如果 buff 中的元素不包含指针, Hchan 对 GC 来说并不重要。(如果缓冲区中的元素不包含指针,那么这些元素就不需要被垃圾回收器扫描。) | |
// 缓冲区指向相同的分配区,并且元素类型是持久的。 (大概意思是不会被垃圾回收?) | |
// TODO: 重新考虑何时收集器可以移动已分配的对象。 | |
var c *hchan | |
switch { | |
case mem == 0: | |
// 队列或元素的大小为零。 | |
c = (*hchan)(mallocgc(hchanSize, nil, true)) | |
c.buf = c.raceaddr() | |
case elem.PtrBytes == 0: | |
// 元素不包含指针。 | |
// 一次性分配 hchan 和缓冲区。 | |
c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) | |
c.buf = add(unsafe.Pointer(c), hchanSize) | |
default: | |
// 元素包含指针。 | |
c = new(hchan) | |
c.buf = mallocgc(mem, elem, true) | |
} | |
// 初始化其他字段 | |
c.elemsize = uint16(elem.Size_) | |
c.elemtype = elem | |
c.dataqsiz = uint(size) | |
lockInit(&c.lock, lockRankHchan) | |
if debugChan { | |
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") | |
} | |
return c | |
} |
func (c *hchan) raceaddr() unsafe.Pointer { | |
// 对通道的类似读写操作视为在这个地址进行操作。避免使用 qcount 或 dataqsiz 的地址, | |
// 因为 len () 和 cap () 内置函数会读取这些地址,而我们不希望它们与像 close () 这样的操作发生 data race。 | |
return unsafe.Pointer(&c.buf) | |
} |
// empty reports whether a read from c would block (that is, the channel is | |
// empty). It uses a single atomic read of mutable state. | |
func empty(c *hchan) bool { | |
// c.dataqsiz is immutable. | |
if c.dataqsiz == 0 { | |
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil | |
} | |
return atomic.Loaduint(&c.qcount) == 0 | |
} |
# 等待队列
type waitq struct { | |
first *sudog | |
last *sudog | |
} | |
//sudog 就是 goroutine |
//enqueue 方法将一个 sudog 指针加入等待队列中。 | |
// 这个函数实现了一个双向链表的尾部插入操作。 | |
// 如果队列为空,它会初始化 first 和 last 指针;否则,它会将新元素插入到链表末尾。 | |
func (q *waitq) enqueue(sgp *sudog) { | |
sgp.next = nil | |
x := q.last | |
if x == nil { | |
sgp.prev = nil | |
q.first = sgp | |
q.last = sgp | |
return | |
} | |
sgp.prev = x | |
x.next = sgp | |
q.last = sgp | |
} | |
func (q *waitq) dequeue() *sudog { | |
for { | |
sgp := q.first | |
if sgp == nil { | |
return nil | |
} | |
y := sgp.next | |
if y == nil { | |
q.first = nil | |
q.last = nil | |
} else { | |
y.prev = nil | |
q.first = y | |
sgp.next = nil // 标记为已移除(参见 dequeueSudoG) | |
} | |
// 如果一个 goroutine 由于 select 而被放入此队列, | |
// 那么在 goroutine 被其他 case 唤醒和获取通道锁之间有一个小的时间窗。 | |
// 一旦它获取了锁,它会将自己从队列中移除,因此我们之后不会再看到它。 | |
// 我们在 G 结构中使用一个标志来告诉我们是否有其他人已经赢得了竞争去唤醒这个 goroutine, | |
// 但这个 goroutine 还没有从队列中移除自己。 | |
if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) { | |
continue | |
} | |
return sgp | |
} | |
} |
# 写入
当 channel 的 sendq 队列中包含处于等待状态的 goroutine 时,取出队列头的 goroutine,直接调用 send。
当有缓冲区时,并且缓冲区未满时,写入缓冲区。
当没有缓冲区时或者缓冲区已满时,阻塞接收。
向 已经 close 的 channel 写入数据会导致 panic
// entry point for c <- x from compiled code. | |
// | |
//go:nosplit | |
func chansend1(c *hchan, elem unsafe.Pointer) { | |
//getcallerpc 获取程序计数器 | |
chansend(c, elem, true, getcallerpc()) | |
} | |
//chansend 是向通道中发送数据的函数 如果 block 参数不为 nil,则不会 sleep,如果无法成功发送则直接 return。 | |
//chansend 会向 buff 写数据 or 向等待接收队列写数据 | |
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { | |
// 如果 channel 是 nil | |
if c == nil { | |
if !block { | |
return false | |
} | |
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) | |
throw("unreachable") | |
} | |
if debugChan { | |
print("chansend: chan=", c, "\n") | |
} | |
if raceenabled { | |
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) | |
} | |
// Fast path: 在未获取锁的情况下检查非阻塞操作是否失败。 | |
// | |
// 在观察到通道未关闭之后,观察通道是否准备好发送。这两个观察都是 a single word-sized 读取(首先是 c.closed 然后是 full ())。 | |
// 即使通道在两次观察之间关闭,关闭的通道也不能从「准备发送」状态变为「未准备好发送」, | |
// 在这个时刻通道既未关闭也未准备发送。我们的行为就像是在那个时刻观察到了通道,并报告发送无法进行。 | |
// | |
// 这里的读操作顺序可以重排序:如果我们观察到通道未准备好发送然后观察到它未关闭,这意味着在第一次观察期间通道未关闭。然而,这里没有任何东西保证向前推进。 | |
// 我们依赖于 chanrecv () 和 closechan () 中锁释放的副作用来更新此线程对 c.closed 和 full () 的观察。 | |
if !block && c.closed == 0 && full(c) { | |
return false | |
} | |
var t0 int64 | |
if blockprofilerate > 0 { | |
t0 = cputicks() | |
} | |
// 加锁 | |
lock(&c.lock) | |
// 已被关闭 | |
if c.closed != 0 { | |
unlock(&c.lock) | |
panic(plainError("send on closed channel")) | |
} | |
if sg := c.recvq.dequeue(); sg != nil { | |
// 找到了一个等待接收的 goroutine。直接将要发送的值直接传递给 goroutine,绕过通道缓冲区)。 | |
send(c, sg, ep, func() { unlock(&c.lock) }, 3) | |
return true | |
} | |
if c.qcount < c.dataqsiz { | |
//buff 中有可用空间。将待发送的元素入队。 | |
//qp 指向 buf 的 sendx 位置 | |
qp := chanbuf(c, c.sendx) | |
if raceenabled { | |
racenotify(c, c.sendx, nil) | |
} | |
// 将数据从 ep 处拷贝到 qp | |
typedmemmove(c.elemtype, qp, ep) | |
// 发送的游标加 1,如果发送的游标值等于容量值,游标值归 0 | |
c.sendx++ | |
if c.sendx == c.dataqsiz { | |
c.sendx = 0 | |
} | |
// 缓冲区的数量加 1 | |
c.qcount++ | |
unlock(&c.lock) | |
return true | |
} | |
//buff 空间已经满了 | |
// 如果不需要阻塞,则直接返回错误 | |
if !block { | |
unlock(&c.lock) | |
return false | |
} | |
// 通道阻塞。某个 goroutine 会为我们完成操作。 | |
gp := getg() | |
mysg := acquireSudog() | |
mysg.releasetime = 0 | |
if t0 != 0 { | |
mysg.releasetime = -1 | |
} | |
// 阻塞操作 | |
// No stack splits between assigning elem and enqueuing mysg | |
// on gp.waiting where copystack can find it. | |
mysg.elem = ep | |
mysg.waitlink = nil | |
mysg.g = gp | |
mysg.isSelect = false | |
mysg.c = c | |
gp.waiting = mysg | |
gp.param = nil | |
c.sendq.enqueue(mysg) | |
// 向任何试图缩小堆栈的人发出信号,表明我们即将在一个 channel 上放数据。 | |
// 从 G 的状态改变到我们设置 gp.activeStackChans 的这段时间内,堆栈缩小是不安全的。 | |
gp.parkingOnChan.Store(true) | |
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) | |
// 确保发送的值在接收者将其复制出来之前一直保持有效。 | |
// 虽然 sudog 拥有指向堆栈对象的指针,但 sudog 并不被视为堆栈跟踪的根。 | |
KeepAlive(ep) | |
// 被唤醒 | |
if mysg != gp.waiting { | |
throw("G waiting list is corrupted") | |
} | |
gp.waiting = nil | |
gp.activeStackChans = false | |
closed := !mysg.success | |
gp.param = nil | |
if mysg.releasetime > 0 { | |
blockevent(mysg.releasetime-t0, 2) | |
} | |
mysg.c = nil | |
releaseSudog(mysg) | |
if closed { | |
if c.closed == 0 { | |
throw("chansend: spurious wakeup") | |
} | |
panic(plainError("send on closed channel")) | |
} | |
return true | |
} |
//send 在空通道 c 上执行发送操作。 | |
// 发送者发送的值 ep 被复制到接收者 sg。 然后,接收者被唤醒并继续其正常操作。 | |
// 通道 c 必须为空且已锁定。send 使用 unlockf 函数解锁 c。 | |
//sg 必须已从 c 中出列。ep 必须非空且指向堆或调用者的堆栈。 | |
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { | |
if raceenabled { | |
if c.dataqsiz == 0 { | |
racesync(c, sg) | |
} else { | |
// 假装经过缓冲区,实际上是直接复制 | |
// 注意只有在启用竞争条件检测时,才需要增加头 / 尾位置。 | |
racenotify(c, c.recvx, nil) | |
racenotify(c, c.recvx, sg) | |
c.recvx++ | |
if c.recvx == c.dataqsiz { | |
c.recvx = 0 | |
} | |
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz | |
} | |
} | |
if sg.elem != nil { | |
// 直接拷贝数据 | |
sendDirect(c.elemtype, sg, ep) | |
sg.elem = nil | |
} | |
gp := sg.g | |
unlockf() | |
gp.param = unsafe.Pointer(sg) | |
sg.success = true | |
if sg.releasetime != 0 { | |
sg.releasetime = cputicks() | |
} | |
goready(gp, skip+1) | |
} |
// 在无缓冲或空缓冲通道上发送和接收是唯一一个运行的 goroutine 写入另一个运行 goroutine 的堆栈的操作。 | |
// GC 假设堆栈写入仅在 goroutine 运行时发生,并且仅由该 goroutine 完成。 | |
// 使用写屏障足以弥补违反这一假设,但写屏障必须起作用。typedmemmove 将调用 bulkBarrierPreWrite,但目标字节不在堆中,因此这无济于事。我们安排调用 memmove 并键入 BitsBulkBarrier。 | |
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { | |
// src is on our stack, dst is a slot on another stack. | |
// Once we read sg.elem out of sg, it will no longer | |
// be updated if the destination's stack gets copied (shrunk). | |
// So make sure that no preemption points can happen between read & use. | |
dst := sg.elem | |
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) | |
// No need for cgo write barrier checks because dst is always | |
// Go memory. | |
memmove(dst, src, t.Size_) | |
} |
# 接收
当 channel 被关闭时,缓冲区中没数据,返回
当 channel 被关闭时,sendq 队列中有处于等待状态的 goroutine 取出队列头的 goroutine,直接调用 recv
当 channel 的缓冲区中有数据时,读取缓冲区数据
当 channel 没有被关闭,缓冲区没有数据,阻塞接收。
// entry points for <- c from compiled code. | |
//go:nosplit | |
func chanrecv1(c *hchan, elem unsafe.Pointer) { | |
chanrecv(c, elem, true) | |
} | |
//go:nosplit | |
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { | |
_, received = chanrecv(c, elem, true) | |
return | |
} | |
//chanrecv 在通道 c 上接收数据,并将接收到的数据写入 ep。 | |
//ep 可能为 nil,此时接收到的数据将被忽略。 | |
// 如果 block == false 且没有可用的元素,则返回 (false, false)。 | |
// 否则,如果 c 被关闭,则将 *ep 清零并返回 (true, false)。 | |
// 否则,将一个元素填入 *ep 并返回 (true, true)。 | |
// 非 nil 的 ep 必须指向堆或调用者的栈。 | |
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { | |
//raceenabled: 不需要检查 ep,因为它总是在栈上或者由反射新分配的内存中。 | |
if debugChan { | |
print("chanrecv: chan=", c, "\n") | |
} | |
if c == nil { | |
if !block { | |
return | |
} | |
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) | |
throw("unreachable") | |
} | |
// Fast path: 检查非阻塞操作的失败,而不获取锁。 | |
if !block && empty(c) { | |
// 在观察到 channel !block 后,我们观察通道是否关闭。 | |
// 重新排序这些检查可能会导致在 data race close 时出现不正确的行为。 | |
// 例如,如果 channel 是打开的且不为空,然后被关闭,然后被耗尽,重新排序可能会错误地指示 “打开且为空”” | |
// 为了防止重新排序,我们对这两种检查都使用 atomic loads,并依赖于在同一锁的分离临界区内完成清空和关闭。。 | |
// 当关闭一个有阻塞发送的无缓冲通道时,这一假设会失效,但那是一个错误状态。 | |
if atomic.Load(&c.closed) == 0 { | |
// 因为通道不能被重新打开,通道稍后的未关闭状态观察意味着它在第一次观察时也未关闭。 | |
// 我们相当于观察到了那一刻的通道并报告接收不能继续。 | |
return | |
} | |
// 通道不可逆转地关闭了。重新检查通道是否有待接收的数据, | |
// 这些数据可能在上面的空和关闭检查之间到达。与这种发生竞争时也需要顺序一致性。 | |
if empty(c) { | |
// 通道不可逆关闭且为空。 | |
if raceenabled { | |
raceacquire(c.raceaddr()) | |
} | |
if ep != nil { | |
typedmemclr(c.elemtype, ep) | |
} | |
return true, false | |
} | |
} | |
var t0 int64 | |
if blockprofilerate > 0 { | |
t0 = cputicks() | |
} | |
lock(&c.lock) | |
if c.closed != 0 { | |
if c.qcount == 0 { | |
if raceenabled { | |
raceacquire(c.raceaddr()) | |
} | |
unlock(&c.lock) | |
if ep != nil { | |
typedmemclr(c.elemtype, ep) | |
} | |
return true, false | |
} | |
} else { // 通道已关闭,但通道的缓冲区中有数据。 | |
if sg := c.sendq.dequeue(); sg != nil { // 刚找到了等待的发送者且仍未关闭。 | |
// 找到等待的发送者。如果缓冲区大小为 0,则从发送者直接接收值。 | |
// 否则,从队列头接收数据,并将发送者的值添加到队列尾(因为队列已满)。 | |
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) | |
return true, true | |
} | |
} | |
if c.qcount > 0 { | |
// 直接从队列接收 | |
qp := chanbuf(c, c.recvx) | |
if raceenabled { | |
racenotify(c, c.recvx, nil) | |
} | |
if ep != nil { | |
typedmemmove(c.elemtype, ep, qp) | |
} | |
typedmemclr(c.elemtype, qp) | |
c.recvx++ | |
if c.recvx == c.dataqsiz { | |
c.recvx = 0 | |
} | |
c.qcount-- | |
unlock(&c.lock) | |
return true, true | |
} | |
if !block { | |
unlock(&c.lock) | |
return false, false | |
} | |
// 没有可用的发送者:在该通道上阻塞。 | |
gp := getg() | |
mysg := acquireSudog() | |
mysg.releasetime = 0 | |
if t0 != 0 { | |
mysg.releasetime = -1 | |
} | |
// 在分配 elem 和将 mysg 入队到 gp.waiting 之间不进行堆栈拆分 | |
//copystack 可以找到它。 | |
mysg.elem = ep | |
mysg.waitlink = nil | |
gp.waiting = mysg | |
mysg.g = gp | |
mysg.isSelect = false | |
mysg.c = c | |
gp.param = nil | |
c.recvq.enqueue(mysg) | |
// 向任何试图缩小堆栈的人发出信号,表明我们即将在通道上阻塞。 | |
// 此 G 的状态改变与我们设置 gp.activeStackChans 之间的时间窗口对栈缩小不安全 | |
gp.parkingOnChan.Store(true) | |
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) | |
// 被唤醒 | |
if mysg != gp.waiting { | |
throw("G waiting list is corrupted") | |
} | |
gp.waiting = nil | |
gp.activeStackChans = false | |
if mysg.releasetime > 0 { | |
blockevent(mysg.releasetime-t0, 2) | |
} | |
success := mysg.success | |
gp.param = nil | |
mysg.c = nil | |
releaseSudog(mysg) | |
return true, success | |
} |
//recv 处理满通道 c 上的接收操作。 | |
// 包含 2 部分: | |
// 1. 由发送者 sg 发送的值被放入通道,并唤醒发送者去执行其他操作。 | |
// 2. 接收者(当前 Goroutine)接收的值写入 ep。 | |
// | |
// 对于同步通道,两个值相同。 | |
// 对于异步通道,接收者从通道缓冲区获取其数据,而发送者的数据被放入通道缓冲区。 | |
// 通道 c 必须已满且被锁定。recv 用 unlockf 解锁 c。 | |
//sg 必须已出列。 | |
// 非 nil 的 ep 必须指向堆或调用者的栈。 | |
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { | |
if c.dataqsiz == 0 { | |
if raceenabled { | |
racesync(c, sg) | |
} | |
if ep != nil { | |
// 从发送者拷贝数据 | |
recvDirect(c.elemtype, sg, ep) | |
} | |
} else { | |
// 队列已满。取队列头的项目。使发送者将其项目入队到队列尾。 | |
// 由于队列已满,这两个位置都是同一个槽。 | |
qp := chanbuf(c, c.recvx) | |
if raceenabled { | |
racenotify(c, c.recvx, nil) | |
racenotify(c, c.recvx, sg) | |
} | |
// 从队列复制数据到接收者 | |
if ep != nil { | |
typedmemmove(c.elemtype, ep, qp) | |
} | |
// 从发送者复制数据到队列 | |
typedmemmove(c.elemtype, qp, sg.elem) | |
c.recvx++ | |
if c.recvx == c.dataqsiz { | |
c.recvx = 0 | |
} | |
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz | |
} | |
sg.elem = nil | |
gp := sg.g | |
unlockf() | |
gp.param = unsafe.Pointer(sg) | |
sg.success = true | |
if sg.releasetime != 0 { | |
sg.releasetime = cputicks() | |
} | |
goready(gp, skip+1) | |
} |
# 关闭
在函数执行的最后会为所有被阻塞的 Goroutine 调用 goready 函数重新对这些协程进行调度.
close nil channel 和 close 已经关闭的 channel 会导致 panic
//go:linkname reflect_chanclose reflect.chanclose | |
func reflect_chanclose(c *hchan) { | |
closechan(c) | |
} | |
func closechan(c *hchan) { | |
if c == nil { | |
panic(plainError("close of nil channel")) | |
} | |
lock(&c.lock) | |
if c.closed != 0 { | |
unlock(&c.lock) | |
panic(plainError("close of closed channel")) | |
} | |
if raceenabled { | |
callerpc := getcallerpc() | |
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) | |
racerelease(c.raceaddr()) | |
} | |
c.closed = 1 | |
var glist gList | |
// 释放所有在接收队列中的 goroutine | |
for { | |
sg := c.recvq.dequeue() | |
if sg == nil { | |
break | |
} | |
if sg.elem != nil { | |
typedmemclr(c.elemtype, sg.elem) | |
sg.elem = nil | |
} | |
if sg.releasetime != 0 { | |
sg.releasetime = cputicks() | |
} | |
// 将当前接收 goroutine 准备好 | |
gp := sg.g | |
gp.param = unsafe.Pointer(sg) | |
sg.success = false | |
if raceenabled { | |
raceacquireg(gp, c.raceaddr()) | |
} | |
// 将准备好的 goroutine 推到列表中 | |
glist.push(gp) | |
} | |
// 释放所有在发送队列中的 goroutine (they will panic) | |
for { | |
sg := c.sendq.dequeue() | |
if sg == nil { | |
break | |
} | |
sg.elem = nil | |
if sg.releasetime != 0 { | |
sg.releasetime = cputicks() | |
} | |
gp := sg.g | |
gp.param = unsafe.Pointer(sg) | |
sg.success = false | |
if raceenabled { | |
raceacquireg(gp, c.raceaddr()) | |
} | |
glist.push(gp) | |
} | |
unlock(&c.lock) | |
// 现在已经 unlock 准备所有 goroutine。 | |
for !glist.empty() { | |
gp := glist.pop() | |
gp.schedlink = 0 | |
goready(gp, 3) | |
} | |
} |
# 参考文章
https://github.com/LeoYang90/Golang-Internal-Notes/blob/master/Go Channel.md
http://legendtkl.com/2017/07/30/understanding-golang-channel/