Go 通道关闭
已关闭的通道的特性:
- 没有一个简单和通用的方法用来在不改变一个通道的状态的情况下检查这个通道是否已经关闭。
- 关闭一个已经关闭的通道将产生一个恐慌,所以在不知道一个通道是否已经关闭的时候关闭此通道是很危险的。
- 向一个已关闭的通道发送数据将产生一个恐慌,所以在不知道一个通道是否已经关闭的时候向此通道发送数据是很危险的。
Go 语言中并没有提供一个内置函数来检查一个通道是否已经关闭。
一、通道关闭的基本方法
通道关闭原则
一个常用的使用 Go 通道的原则是 不要在数据接收方或者在有多个发送者的情况下关闭通道 。换句话说,我们只应该让一个通道唯一的发送者关闭一个此通道。我们可以将称此原则为 通道关闭原则 。
当然,这并不是一个通用的关闭通道的原则。通用的原则是 不要关闭已关闭的通道 。如果我们能够保证从某个时刻之后,再没有协程将向一个未关闭的非 nil 通道发送数据,则一个协程可以安全地关闭此通道。然而,做出这样的保证常常需要很大的努力,从而导致代码过度复杂。另一方面,遵循通道关闭原则是一件相对简单的事儿。
粗鲁地关闭通道的方法
如果由于某种原因,你一定非要从数据接收方或者让众多发送者中的一个关闭一个通道,你可以使用恢复机制来防止可能产生的恐慌而导致程序崩溃。下面就是这样的一个实现(假设通道的元素类型为 T)。
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// 一个函数的返回结果可以在defer调用中修改。
justClosed = false
}
}()
// 假设ch != nil。
close(ch) // 如果ch已关闭,则产生一个恐慌。
return true // <=> justClosed = true; return
}
此方法违反了通道关闭原则。 同样的方法可以用来粗鲁地向一个关闭状态未知的通道发送数据。
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()
ch <- value // 如果ch已关闭,则产生一个恐慌。
return false // <=> closed = false; return
}
礼貌地关闭通道的方法
很多 Go 程序员喜欢使用 sync.Once 来关闭通道。
type MyChannel struct {
C chan T
once sync.Once
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.once.Do(func() {
close(mc.C)
})
}
当然,我们也可以使用 sync.Mutex 来防止多次关闭一个通道。
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
defer mc.mutex.Unlock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
}
func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
这些实现确实比上一节中的方法礼貌一些,但是它们不能完全有效地避免数据竞争。目前的 Go 白皮书并不保证发生在一个通道上的并发关闭操作和发送操纵不会产生数据竞争。如果一个 SafeClose 函数和同一个通道上的发送操作同试运行,则数据竞争可能发生(虽然这样的数据竞争一般并不会带来什么危害)。
二、优雅地关闭通道
情形一:M 个接收者和一个发送者。发送者通过关闭用来传输数据的通道来传递发送结束信号
这是最简单的一种情形。当发送者欲结束发送,让它关闭用来传输数据的通道即可。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
// 发送者
go func() {
for {
if value := rand.Intn(Max); value == 0 {
// 此唯一的发送者可以安全地关闭此数据通道。
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// 接收者
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// 接收数据直到通道dataCh已关闭
// 并且dataCh的缓冲队列已空。
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
情形二:一个接收者和 N 个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要在发送数据了
此情形比上一种情形复杂一些。我们不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了通道关闭原则。但是我们可以让接收者关闭一个额外的信号通道来通知发送者不要在发送数据了。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh是一个额外的信号通道。它的
// 发送者为dataCh数据通道的接收者。
// 它的接收者为dataCh数据通道的发送者。
// 发送者
for i := 0; i < NumSenders; i++ {
go func() {
for {
// 这里的第一个尝试接收用来让此发送者
// 协程尽早地退出。对于这个特定的例子,
// 此select代码块并非必需。
select {
case <- stopCh:
return
default:
}
// 即使stopCh已经关闭,此第二个select
// 代码块中的第一个分支仍很有可能在若干个
// 循环步内依然不会被选中。如果这是不可接受
// 的,则上面的第一个select代码块是必需的。
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// 接收者
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == Max-1 {
// 此唯一的接收者同时也是stopCh通道的
// 唯一发送者。尽管它不能安全地关闭dataCh数
// 据通道,但它可以安全地关闭stopCh通道。
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
如此例中的注释所述,对于此额外的信号通道 stopCh,它只有一个发送者,即 dataCh 数据通道的唯一接收者。dataCh 数据通道的接收者关闭了信号通道 stopCh,这是不违反通道关闭原则的。在此例中,数据通道 dataCh 并没有被关闭。是的,我们不必关闭它。当一个通道不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭。所以这里的优雅性体现在通过不关闭一个通道来停止使用此通道。
情形三:M 个接收者和 N 个发送者。它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号
这是最复杂的一种情形。我们不能让接收者和发送者中的任何一个关闭用来传输数据的通道,我们也不能让多个接收者之一关闭一个额外的信号通道。这两种做法都违反了通道关闭原则。然而,我们可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有的接收者和发送者结束工作。具体实现见下例。注意其中使用了一个尝试发送操作来向中间调解者发送信号。
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh是一个额外的信号通道。它的发送
// 者为中间调解者。它的接收者为dataCh
// 数据通道的所有的发送者和接收者。
toStop := make(chan string, 1)
// toStop是一个用来通知中间调解者让其
// 关闭信号通道stopCh的第二个信号通道。
// 此第二个信号通道的发送者为dataCh数据
// 通道的所有的发送者和接收者,它的接收者
// 为中间调解者。它必须为一个缓冲通道。
var stoppedBy string
// 中间调解者
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// 发送者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// 为了防止阻塞,这里使用了一个尝试
// 发送操作来向中间调解者发送信号。
select {
case toStop <- "发送者#" + id:
default:
}
return
}
// 此处的尝试接收操作是为了让此发送协程尽早
// 退出。标准编译器对尝试接收和尝试发送做了
// 特殊的优化,因而它们的速度很快。
select {
case <- stopCh:
return
default:
}
// 即使stopCh已关闭,如果这个select代码块
// 中第二个分支的发送操作是非阻塞的,则第一个
// 分支仍很有可能在若干个循环步内依然不会被选
// 中。如果这是不可接受的,则上面的第一个尝试
// 接收操作代码块是必需的。
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// 接收者
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// 和发送者协程一样,此处的尝试接收操作是为了
// 让此接收协程尽早退出。
select {
case <- stopCh:
return
default:
}
// 即使stopCh已关闭,如果这个select代码块
// 中第二个分支的接收操作是非阻塞的,则第一个
// 分支仍很有可能在若干个循环步内依然不会被选
// 中。如果这是不可接受的,则上面尝试接收操作
// 代码块是必需的。
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
// 为了防止阻塞,这里使用了一个尝试
// 发送操作来向中间调解者发送信号。
select {
case toStop <- "接收者#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("被" + stoppedBy + "终止了")
}
请注意,信号通道 toStop 的容量必须至少为 1。如果它的容量为 0,则在中间调解者还未准备好的情况下就已经有某个协程向 toStop 发送信号时,此信号将被抛弃。 我们也可以不使用尝试发送操作向中间调解者发送信号,但信号通道 toStop 的容量必须至少为数据发送者和数据接收者的数量之和,以防止向其发送数据时(有一个极其微小的可能)导致某些发送者和接收者协程永久阻塞。
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
情形四:“M 个接收者和一个发送者”情形的一个变种:用来传输数据的通道的关闭请求由第三方发出
有时,数据通道(dataCh)的关闭请求需要由某个第三方协程发出。对于这种情形,我们可以使用一个额外的信号通道来通知唯一的发送者关闭数据通道(dataCh)。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
const NumThirdParties = 15
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
closing := make(chan struct{}) // 信号通道
closed := make(chan struct{})
// 此stop函数可以被安全地多次调用。
stop := func() {
select {
case closing<-struct{}{}:
<-closed
case <-closed:
}
}
// 一些第三方协程
for i := 0; i < NumThirdParties; i++ {
go func() {
r := 1 + rand.Intn(3)
time.Sleep(time.Duration(r) * time.Second)
stop()
}()
}
// 发送者
go func() {
defer func() {
close(closed)
close(dataCh)
}()
for {
select{
case <-closing: return
default:
}
select{
case <-closing: return
case dataCh <- rand.Intn(Max):
}
}
}()
// 接收者
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
情形五:“N 个发送者”的一个变种:用来传输数据的通道必须被关闭以通知各个接收者数据发送已经结束了
在上面的提到的“N 个发送者”情形中,为了遵守通道关闭原则,我们避免了关闭数据通道(dataCh)。但是有时候,数据通道(dataCh)必须被关闭以通知各个接收者数据发送已经结束。对于这种“N 个发送者”情形,我们可以使用一个中间通道将它们转化为“一个发送者”情形,然后继续使用上一节介绍的技巧来关闭此中间通道,从而避免了关闭原始的 dataCh 数据通道。
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 1000000
const NumReceivers = 10
const NumSenders = 1000
const NumThirdParties = 15
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int) // 仍就不会被关闭
middleCh := make(chan int) // 将被关闭
closing := make(chan string)
closed := make(chan struct{})
var stoppedBy string
stop := func(by string) {
select {
case closing <- by:
<-closed
case <-closed:
}
}
// 中间层
go func() {
exit := func(v int, needSend bool) {
close(closed)
if needSend {
dataCh <- v
}
close(dataCh)
}
for {
select {
case stoppedBy = <-closing:
exit(0, false)
return
case v := <- middleCh:
select {
case stoppedBy = <-closing:
exit(v, true)
return
case dataCh <- v:
}
}
}
}()
// 一些第三方协程
for i := 0; i < NumThirdParties; i++ {
go func(id string) {
r := 1 + rand.Intn(3)
time.Sleep(time.Duration(r) * time.Second)
stop("3rd-party#" + id)
}(strconv.Itoa(i))
}
// 发送者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
stop("sender#" + id)
return
}
select {
case <- closed:
return
default:
}
select {
case <- closed:
return
case middleCh <- value:
}
}
}(strconv.Itoa(i))
}
// 接收者
for range [NumReceivers]struct{}{} {
go func() {
defer wgReceivers.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}