Golang缓冲与无缓冲channel

Golang缓冲与无缓冲channel教程

golang 在语言层面支持并发编程,也就是 goroutine,可以看做一种轻量级的线程。程序启动时,其主函数即在一个单独的 goruntine 中运行,叫做 main goruntine,在程序中通过关键字 go 跟上函数(支持匿名函数)就可以启动一个新的 goroutine,可以叫做 sub goruntine。

在基于多线程设计的并发编程模型中,线程间的通信往往通过共享数据来实现,而保证共享数据的一致性非常关键。如果线程间有竞争条件,那么对共享数据的访问往往需要加锁来保证一致性,而针对不同的访问竞争,比如读/读、读/写、写/写,需要用不同的锁机制,要想兼顾性能和一致性保证需要煞费苦心,尤其是线程间共享数据比较多的时候。

为了更简单的并发编程,go 语言提出了自己的信仰:用通信来共享内存,而不要用共享内存来通信。对于 goroutine 之间的通信,channel 是最好的选择,铭记这句原则:用通信来共享内存,而不要用共享内存来通信,可以帮助我们更好的理解 channel。

channel状态

channel 作为 go 的一种基本数据类型,它有 3 种基本状态:nil、open、closed:

/* nil channel */ var ch = chan string // A channel is in a nil state when it is declared to its zero value ch = nil // A channel can be placed in a nil state /* open channel */ ch := make(chan string) // A channel is in a open state when it’s made using the built-in function make. /* closed channel */ close(ch) // A channel is in a closed state when it’s closed using the built-in function close.

当 channel 处于这 3 种不同的状态时,对于 channel 上的操作也会有不同的行为,理解这些行为对于正确的使用 channel 非常重要。

33_Go语言有缓冲与无缓冲channel使用.png

上面这张图总结了这些行为,需要注意的是处于 closed 状态的 channel,执行 send 操作(ch <- data)将会触发 panic 异常,而 receive 操作(<- ch)则不会,这表明了在 channel 被 close 之后,goruntine 仍然可以从 channel 取走数据,如果 channel 中没有数据可取时,receive 操作会立刻返回零值(nil)。

range 循环可以直接在 channel 上迭代,当 channel 被关闭并且没有数据时可以直接跳出循环。另外,对于 nil 和 closed 状态的 channel 执行 close 操作也会触发 panic 异常。

unbufferd channel和bufferd channel

使用场景

虽然 channel 最常用于 goroutine 之间的通信,但是 channel 上的 send 和 receive 操作并不一定需要携带数据。根据 channel 是否需要传递数据,可以区分出一些 channel 的使用场景。

没有数据的 channel 的使用场景:

  • goroutine A 通过 channel 告诉 goroutine B:”请停止正在做的事情“
  • goroutine A 通过 channel 告诉 goroutine B:”我完成了要做的事情,但是没有任何结果需要反馈“

通知的方式一般是 close 操作,goroutine A 对 channel 执行了 close 操作,而 goruntine B 得到 channel 已经被关闭这个信息后可以执行相应的处理。使用没有数据的 channel 的好处:一个 goroutine 可以同时给多个 goroutine 发送消息,只是这个消息不携带额外的数据,所以常被用于批量 goruntine 的退出。

对于这种不携带数据,只是作为信号的 channel,一般使用如下:

ch := make(chan struct{}) ch <- struct{}{} <- ch

带有数据的 channel 的使用场景:

  • goroutine A 通过 channel 告诉 goroutine B:”请根据我传递给你的数据开始做一件事情“
  • goroutine A 通过 channel 告诉 goroutine B:”我完成了要做的事情,请接收我传递的数据(结果)“

通知的方式就是 goroutine A 执行 send 发送数据,而 goroutine B 执行 receive 接收数据。channel 携带的数据只能被一个 goruntine 得到,一个 goruntine 取走数据后这份数据在 channel 里就不复存在了。

对于需要携带数据的 channel,一般又可以分成带有 buffer 的 channel(bufferd channel)和不带 buffer 的 channel(unbufferd channel)。

unbufferd channel

对于 unbufferd channel,不存储任何数据,只负责数据的流通,并且数据的接收一定发生在数据发送完成之前。更详细的解释是,goroutine A 在往 channel 发送数据完成之前,一定有 goroutine B 在等着从这个 channel 接收数据,否则发送就会导致发送的 goruntine 被 block 住,所以发送和接收的 goruntine 是耦合的。

看下面这个例子,往 ch 发送数据时就使 main gouruntine 被永久 block 住,导致程序死锁。

func main() { var ch = make(chan string) ch <- "hello" //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: fmt.Println(<-ch) }

有人可能会考虑将接收操作放到前面,不幸的是仍然导致了死锁,因为 channel 里没有数据,当前 goruntine 也会被 block 住,导致程序死锁。

func main() { var ch = make(chan string) fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: ch <- "hello" }

这次,我们在另一个 goruntine 中执行 receive,程序就可以正常工作了。因为在 main goruntine 发送完数据之前,sub goroutine 已经在等待接收数据。

func main() { var ch = make(chan string) go func() { fmt.Println(<-ch) //out: hello }() ch <- "hello" }

再看下面这个例子,我们期望在 sub goruntine 中打印 10 个数,实际上却只有 main goruntine 打印了 hello。因为在 sub goruntine 打印之前,main goruntine 就已经执行完成并退出了。

func main() { go func() { for i := 0; i < 10; i++ { fmt.Printf("%d ", i) } }() fmt.Println("hello") }

这个时候就可以用一个 unbufferd channel 来让两个 goruntine 之间有一些通信,让 main goruntine 收到 sub goruntine 通知后再退出。在这种场景中,channel 并不携带任何数据,只是起到一个信号的作用。

func main() { var ch = make(chan string) go func() { for i := 0; i < 10; i++ { fmt.Printf("%d ", i) } ch <- "exit" }() fmt.Println("hello") <-ch }

bufferd channel

对带有缓冲区的 channel 执行 send 和 receive 操作,其行为和不带缓冲区的 channel 不太一样。继续讨论最开始的例子,不过这次的 channel 是一个 size=1 的 bufferd channel,将数据发送给 channel 后,数据被拷贝到 channel 的缓冲区,goruntine 继续往后执行,所以程序可以正常工作。

func main() { var ch = make(chan string, 1) ch <- "hello" fmt.Println(<-ch) //hello }

但是当我们调换发送和接收的顺序时,程序又进入了死锁。因为当 channel 里没有数据时(干涸),执行 receive 的 goroutine 也会被 block 住,最终导致了死锁。

func main() { var ch = make(chan string, 1) fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: ch <- "hello" }

此外,buffer size=1 和 buffer size>1 的 channel 对于数据的交付也有一些细微的不同:

  • 对于 buffer size=1 的 channel,第二个数据发送完成之前,之前发送的第一个数据一定被取走了,否则发送也会被 block 住,这其实说明了数据的交付得到了延迟保证。
  • 对于 buffer size>1 的 channel,发送数据时,之前发送的数据不能保证一定被取走了,并且 buffer size 越大,数据的交付得到的保证越少。也正是由于这种无保证交付,减少了 goroutine 之间通信时的阻塞延迟,根据发送数据、接收数据、数据处理的速度合理的设计 buffer size,甚至可以在不浪费空间的情况下做到没有任何延迟。

如果 channel buffer 已经塞满了数据,继续执行发送会导致当前 goruntine 被 block 住(阻塞),直到 channel 中的数据被取走一部分才可以继续向 channel 发送数据。

通过 channel buffer,解耦了发送和接收的 goruntine。需要小心的是,buffered channel 虽然可以看做一个缓存消息的队列,但是其主要用途还是用于多个 goruntine 之间的通信,单个 goruntine 中不要使用 buffered channel 来做缓存队列,send 和 receive 操作很容让 goruntine 被永久 block 住导致整个程序死锁,上面的 demo 也说明了这件事情。

再看下面这个例子,一个简单的生产消费者模型,manager 每 200ms 有一个新的 work 需要分发给 3 个 worker 来完成,manager 每次都只是将 work 发送到一个 channel 中,work 自动从 channel 中取出 work 并处理,每个 worker 完成一个 work 需要 1s 的时间,manager 累计分发 10 个 work,这个时候我们发现没有阻塞。但是如果 manager 继续不停地分发 work,就会发现 channel 缓冲区被塞满,manager 总是在等待 worker。所以,根据处理需求,合理的设计 worker(goruntine)数量和 channel buffer size 非常重要。

package main import ( "fmt" "math/rand" "strconv" "time" ) func main() { fmt.Println("嗨客网(www.haicoder.net)") const cap = 3 ch := make(chan string, cap) for index := 0; index < cap; index++ { go func() { for p := range ch { fmt.Println("Worker received: ", p) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } }() } worknum := 10 for index := 0; index < worknum; index++ { time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond) work := "work " + strconv.Itoa(index) select { case ch <- work: fmt.Println("Manager: send a work") default: fmt.Println("Manager: wait worker") } } close(ch) }

运行后,如下图所示:

34_Go语言有缓冲与无缓冲channel使用.png

我们使用了 channel,实现了协程的通信。