Channel
Channel
基本认知
- 并发操作可使用锁解决,但Golang并不推荐使用锁,因为Golang认为锁操作其实是在共享内存空间,语言本身不推荐这种方式。
channel是一种通信机制,可通过通道让一个Goroutine向另一个Goroutine发送数据,以此达到共享内存的目的。- 通道中的数据也存在于内存中,通过通道将其保护起来,使其处于全局。
- 通道中有许多规则,限制了发送方和接收方的行为。
channel是一种数据类型,并且是引用类型,可创建一个环形的数组。channel让Goroutine之间有了通信的能力,从而避免了加锁的问题。
并发哲学
不要通过共享内存来进行通信,而应该通过通信的方式来共享内存。
对比
锁机制
用两个Goroutine去修改一个变量值,变量值空间就是两个Goroutine的共享内存,需要通过锁机制来限制Goroutine的并发操作的原子化。
通道机制
一个Goroutine把数据推进到一个通道内,让其它的Gorouitne通过这个通道去获取数据,在这个通道中设计有发送与接受的规则。

阻塞规则
同步阻塞
无缓存区,数据在发送后,只有在被接收的情况下,才能继续再次发送数据。

异步非阻塞
有缓冲区,在缓冲区未满的情况下,可继续发送数据,是非阻塞状态;而在缓冲区满的情况下是阻塞状态。

基本使用
通过chan类型声明通道:
package main
import "fmt"
func main() {
// 声明创建一个channel
c := make(chan int)
go func() {
// 把1推入或发送到通道
c <- 1
}()
// 从通道中接收数据
value := <-c
fmt.Println(value)
}
无缓冲区
考虑如下程序的输出顺序:
package main
import (
"fmt"
"time"
)
func main() {
// c := make(chan int)
// go func() {
// fmt.Println("Sending 1") // 1
// c <- 1
// fmt.Println("Sending 2") // 2
// c <- 2
// fmt.Println("Sending 3") // 5
// c <- 3
// }()
// v1 := <-c
// fmt.Println("Received", v1) // 3
// v2 := <-c
// fmt.Println("Received", v2) // 4
// v3 := <-c
// fmt.Println("Received", v3) // 6
c := make(chan int)
s := []int{1, 2, 3, 4, 5}
go func() {
defer close(c)
for _, v := range s {
fmt.Println("Sending", v)
c <- v
time.Sleep(1 * time.Second)
}
}()
time.Sleep(5 * time.Second)
for v := range c {
fmt.Println(v)
}
}
在无缓冲区的情况下,数据只能发送一个接收一个,因此如果接收不及时,可能存在阻塞的情况。
有缓冲区
package main
import (
"fmt"
"time"
)
func main() {
// 有缓冲区
c := make(chan int, 5)
s := []int{1, 2, 3, 4, 5}
go func() {
defer close(c)
for _, v := range s {
fmt.Println("Sending", v)
c <- v
time.Sleep(1 * time.Second)
}
}()
time.Sleep(5 * time.Second)
for v := range c {
fmt.Println(v)
}
}
/**
Sending 1
Sending 2
Sending 3
Sending 4
Sending 5
1
2
3
4
5
*/
close
如果Channel永远不关闭,就会造成死锁的错误,GMP在死锁状态下,会将所有的G设置为休眠状态。
发送方关闭Channel。
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
// defer close(c)
c <- 1
}()
v1, ok1 := <-c
// fatal error: all goroutines are asleep - deadlock!
v2, ok2 := <-c
fmt.Println(v1, ok1)
fmt.Println(v2, ok2)
}
接收值的语法v, ok := <-channel有两个返回值:发送的数据与接收到的是否是发送的数据。因此可通过ok来判定接收到的数据是不是另外的G发送的数据,如果不是,则v为对应类型的默认值,
Channel被关闭后,还可以接收数据,但是不能发送数据。如果在关闭的情况下继续发送数据,运行期间会报错。
单向Channel
一个Channel只能发送或者只能接收,可限制双向Channel的操作。
单向Channel完全封闭了Channel的发送或接收功能,业务需求在大部分情况下是无法满足的。
仅发送
一个只能发送数据的Channel。
package main
import "fmt"
func main() {
c := make(chan<- int)
c <- 1
// invalid operation: cannot receive from send-only channel c
v := <-c
}
仅接收
一个只能接收数据的Channel。
package main
import "fmt"
func main() {
c := make(<-chan int)
// invalid operation: cannot send to receive-only channel c
c <- 1
}
死锁场景
阻塞并且无法解开的情况或是程序锁定了。
情况一
当有或者没有缓冲区的Channel没有发送方,如果直接接收会导致死锁。
package main
import "fmt"
func main() {
c := make(chan int)
// fatal error: all goroutines are asleep - deadlock!
<-c
}
情况二
无缓冲区的Channel没有接收方会导致死锁。因为如果没接收方,Channel的数据就无法被消费,是一种无法解开的阻塞。
package main
import "fmt"
func main() {
c := make(chan int)
// fatal error: all goroutines are asleep - deadlock!
c <- 1
}
有缓冲区但缓冲区未满的Channel,如果没有接收方,不会导致死锁。因为Channel里有缓冲区数组保存这些数组,不会死锁。
package main
import "fmt"
func main() {
c := make(chan int, 5)
c <- 1
}
情况三
Channel有缓冲区,但缓冲区已满,如果继续推送数据到Channel,在没有接收方的情况下会导致死锁。
package main
import "fmt"
func main() {
c := make(chan int, 2)
c <- 1
c <- 2
// fatal error: all goroutines are asleep - deadlock!
c <- 3 // 阻塞
}
情况四
c为nil且没有缓冲区和接收方,此时通道还没有创建,数据无法推入,因此会导致死锁。
package main
import "fmt"
func main() {
var c chan int
c <- 1
}
情况五
for range接收Channel数据且发送方没有关闭Channel会造成死锁。
package main
import "fmt"
func main() {
var wg sync.WaitGroup
c := make(chan int, 5)
wg.Add(2)
go func(c chan<- int) {
defer wg.Done()
// defer close(c)
for i := 0; i < 10; i++ {
fmt.Printf("Sending %d \r\n", i)
c <- i
}
}(c)
time.Sleep(3 * time.Second)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
fmt.Println("Received ", v)
}
}(c)
wg.Wait()
}
select
select语句允许一个Goroutine在多个通信操作上等待。
select语句与switch语句类似,但在select语句中,case语句涉及通信,即在通道上进行发送或接收操作。
基本语法
基本语法如下:
select {
case SendOrReceive1: // Statement
case SendOrReceive2: // Statement
case SendOrReceive3: // Statement
.......
default: // Statement
}
阻塞运行
select会阻塞,直到其中一个case可以运行,然后执行该case。
package main
import (
"fmt"
"time"
)
func portal1(c chan string) {
time.Sleep(1 * time.Second)
c <- "Welcome to channel 1"
}
func portal2(c chan string) {
time.Sleep(2 * time.Second)
c <- "Welcome to channel 2"
}
func main() {
R1 := make(chan string)
R2 := make(chan string)
go portal1(R1)
go portal2(R2)
select {
case op1 := <-R1:
fmt.Println(op1)
case op2 := <-R2:
fmt.Println(op2)
}
}
空case
如果一个select语句不包含任何的case语句,那么select语句会一直等待:
package main
func main() {
// fatal error: all goroutines are asleep - deadlock!
select{ }
}
随机性
如果有多个case同时满足条件,那么会随机选择其中一个执行:
package main
import (
"fmt"
)
func portal1(c chan string) {
c <- "Welcome to channel 1"
}
func portal2(c chan string) {
c <- "Welcome to channel 2"
}
func main() {
R1 := make(chan string)
R2 := make(chan string)
go portal1(R1)
go portal2(R2)
select {
case op1 := <-R1:
fmt.Println(op1)
case op2 := <-R2:
fmt.Println(op2)
}
}
循环监听
select监听一次后会直接结束,可使用for进行循环等待:
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
如上,会进行2次select的监听,因此两个Goroutine的读取操作均可监听到。
监听发送
select也可用来监听发送数据:
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
如上,case c <- x表示Goroutine尝试将变量x的值发送到通道c,如果发送成功则执行该case,否则将继续等待监听。
default
一旦Channel阻塞,那么一定会走default:
package main
import (
"fmt"
"time"
)
func main() {
var wg sync.WaitGroup
c := make(chan int)
wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case c <- 1:
fmt.Println("不阻塞")
default:
fmt.Println("阻塞")
}
time.Sleep(2 * time.Second)
}
}()
go func() {
defer wg.Done()
fmt.Println(<-c)
}()
wg.Wait()
}
基本原理
Channel是一个hchanq结构体,结构如下:
- hchan.go
- waitq.go
- sudog.go
type hchan struct {
// total data in the queue
qcount uint
// size of the circular queue
dataqsiz uint
// points to an array of dataqsiz elements
buf unsafe.Pointer
// element size
elemsize uint16
// channel closed flag
closed uint32
// element type
elemtype *_type
// send index
sendx uint
// receive index
recvx uint
// list of recv waiters (blocking goroutines)
recvq waitq
// list of send waiters (blocking goroutines)
sendq waitq
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
// wrap the goroutines
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
waiters uint16
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

阻塞与不阻塞
Channel的读取操作不同的场景可能产生阻塞或不阻塞:
不阻塞
- 缓冲区有数据。
- 缓冲区满了或没有缓冲区,但是
sendq有待发送数据的Goroutine。
阻塞
- Channel是
nil。 - Chaneel没有缓冲区,
sendq也没有等待的Goroutine。 - 有缓冲区但是缓存区没有数据,并且
sendq也没有等待的Goroutine。
close
- Channel只声明了,并未初始化,关闭时会报错。
package main
var c chan string
func main() {
// panic: close of nil channel
close(c)
- 不能多次关闭同一个Channel。
package main
var c = make(chan string)
func main() {
close(c)
// panic: close of closed channel
close(c)
}
- 已关闭的Channel的再次发送数据会报错。Channel关闭以后会唤醒所有等待的Goroutine并放到一个
glist链表,然后遍历该链表,遇到写入操作的Goroutine会panic,遇到读取操作的Goroutine会继续执行。
package main
import "time"
func main() {
var c = make(chan string, 3)
go func() {
c <- "a"
c <- "b"
close(c)
// panic: send on closed channel
c <- "c"
}()
time.Sleep(2 * time.Second)
}