Skip to main content

Channel

Channel

基本认知

  1. 并发操作可使用锁解决,但Golang并不推荐使用锁,因为Golang认为锁操作其实是在共享内存空间,语言本身不推荐这种方式。
  2. channel是一种通信机制,可通过通道让一个Goroutine向另一个Goroutine发送数据,以此达到共享内存的目的。
  3. 通道中的数据也存在于内存中,通过通道将其保护起来,使其处于全局。
  4. 通道中有许多规则,限制了发送方和接收方的行为。
  5. channel是一种数据类型,并且是引用类型,可创建一个环形的数组。
  6. channel让Goroutine之间有了通信的能力,从而避免了加锁的问题。

并发哲学

不要通过共享内存来进行通信,而应该通过通信的方式来共享内存。

对比

锁机制

用两个Goroutine去修改一个变量值,变量值空间就是两个Goroutine的共享内存,需要通过锁机制来限制Goroutine的并发操作的原子化。

通道机制

一个Goroutine把数据推进到一个通道内,让其它的Gorouitne通过这个通道去获取数据,在这个通道中设计有发送与接受的规则。

锁与通道

阻塞规则

同步阻塞

无缓存区,数据在发送后,只有在被接收的情况下,才能继续再次发送数据。

同步阻塞

异步非阻塞

有缓冲区,在缓冲区未满的情况下,可继续发送数据,是非阻塞状态;而在缓冲区满的情况下是阻塞状态。

异步非阻塞或阻塞

基本使用

通过chan类型声明通道:

package main

import "fmt"

func main() {
// 声明创建一个channel
c := make(chan int)

go func() {
// 把1推入或发送到通道
c <- 1
}()

// 从通道中接收数据
value := <-c

fmt.Println(value)
}
无缓冲区

考虑如下程序的输出顺序:

package main

import (
"fmt"
"time"
)

func main() {
// c := make(chan int)

// go func() {
// fmt.Println("Sending 1") // 1
// c <- 1
// fmt.Println("Sending 2") // 2
// c <- 2
// fmt.Println("Sending 3") // 5
// c <- 3
// }()

// v1 := <-c
// fmt.Println("Received", v1) // 3
// v2 := <-c
// fmt.Println("Received", v2) // 4
// v3 := <-c
// fmt.Println("Received", v3) // 6

c := make(chan int)
s := []int{1, 2, 3, 4, 5}

go func() {
defer close(c)
for _, v := range s {
fmt.Println("Sending", v)
c <- v
time.Sleep(1 * time.Second)
}
}()
time.Sleep(5 * time.Second)
for v := range c {
fmt.Println(v)
}
}

在无缓冲区的情况下,数据只能发送一个接收一个,因此如果接收不及时,可能存在阻塞的情况。

有缓冲区
package main

import (
"fmt"
"time"
)

func main() {
// 有缓冲区
c := make(chan int, 5)

s := []int{1, 2, 3, 4, 5}

go func() {
defer close(c)
for _, v := range s {
fmt.Println("Sending", v)
c <- v
time.Sleep(1 * time.Second)
}
}()

time.Sleep(5 * time.Second)

for v := range c {
fmt.Println(v)
}
}
/**
Sending 1
Sending 2
Sending 3
Sending 4
Sending 5
1
2
3
4
5
*/
close

如果Channel永远不关闭,就会造成死锁的错误,GMP在死锁状态下,会将所有的G设置为休眠状态。

发送方关闭Channel。

package main

import "fmt"

func main() {
c := make(chan int)

go func() {
// defer close(c)
c <- 1
}()

v1, ok1 := <-c
// fatal error: all goroutines are asleep - deadlock!
v2, ok2 := <-c

fmt.Println(v1, ok1)
fmt.Println(v2, ok2)
}

接收值的语法v, ok := <-channel有两个返回值:发送的数据与接收到的是否是发送的数据。因此可通过ok来判定接收到的数据是不是另外的G发送的数据,如果不是,则v为对应类型的默认值,

Channel被关闭后,还可以接收数据,但是不能发送数据。如果在关闭的情况下继续发送数据,运行期间会报错。

单向Channel

一个Channel只能发送或者只能接收,可限制双向Channel的操作。

单向Channel完全封闭了Channel的发送或接收功能,业务需求在大部分情况下是无法满足的。

仅发送

一个只能发送数据的Channel。

package main

import "fmt"

func main() {
c := make(chan<- int)
c <- 1
// invalid operation: cannot receive from send-only channel c
v := <-c
}
仅接收

一个只能接收数据的Channel。

package main

import "fmt"

func main() {
c := make(<-chan int)
// invalid operation: cannot send to receive-only channel c
c <- 1
}

死锁场景

阻塞并且无法解开的情况或是程序锁定了。

情况一

当有或者没有缓冲区的Channel没有发送方,如果直接接收会导致死锁。

package main

import "fmt"

func main() {
c := make(chan int)
// fatal error: all goroutines are asleep - deadlock!
<-c
}
情况二

无缓冲区的Channel没有接收方会导致死锁。因为如果没接收方,Channel的数据就无法被消费,是一种无法解开的阻塞。

package main

import "fmt"

func main() {
c := make(chan int)
// fatal error: all goroutines are asleep - deadlock!
c <- 1
}

有缓冲区但缓冲区未满的Channel,如果没有接收方,不会导致死锁。因为Channel里有缓冲区数组保存这些数组,不会死锁。

package main

import "fmt"

func main() {
c := make(chan int, 5)
c <- 1
}
情况三

Channel有缓冲区,但缓冲区已满,如果继续推送数据到Channel,在没有接收方的情况下会导致死锁。

package main

import "fmt"

func main() {
c := make(chan int, 2)
c <- 1
c <- 2
// fatal error: all goroutines are asleep - deadlock!
c <- 3 // 阻塞
}
情况四

cnil且没有缓冲区和接收方,此时通道还没有创建,数据无法推入,因此会导致死锁。

package main

import "fmt"

func main() {
var c chan int
c <- 1
}
情况五

for range接收Channel数据且发送方没有关闭Channel会造成死锁。

package main

import "fmt"

func main() {
var wg sync.WaitGroup
c := make(chan int, 5)

wg.Add(2)

go func(c chan<- int) {
defer wg.Done()
// defer close(c)
for i := 0; i < 10; i++ {
fmt.Printf("Sending %d \r\n", i)
c <- i
}
}(c)

time.Sleep(3 * time.Second)

go func(c <-chan int) {
defer wg.Done()
for v := range c {
fmt.Println("Received ", v)
}
}(c)

wg.Wait()
}

select

select语句允许一个Goroutine在多个通信操作上等待。

select语句与switch语句类似,但在select语句中,case语句涉及通信,即在通道上进行发送或接收操作。

基本语法

基本语法如下:

select {
case SendOrReceive1: // Statement
case SendOrReceive2: // Statement
case SendOrReceive3: // Statement
.......
default: // Statement
}
阻塞运行

select会阻塞,直到其中一个case可以运行,然后执行该case

package main

import (
"fmt"
"time"
)

func portal1(c chan string) {
time.Sleep(1 * time.Second)
c <- "Welcome to channel 1"
}

func portal2(c chan string) {
time.Sleep(2 * time.Second)
c <- "Welcome to channel 2"
}

func main() {
R1 := make(chan string)
R2 := make(chan string)

go portal1(R1)
go portal2(R2)

select {
case op1 := <-R1:
fmt.Println(op1)
case op2 := <-R2:
fmt.Println(op2)
}
}
case

如果一个select语句不包含任何的case语句,那么select语句会一直等待:

package main 

func main() {
// fatal error: all goroutines are asleep - deadlock!
select{ }
}
随机性

如果有多个case同时满足条件,那么会随机选择其中一个执行:

package main

import (
"fmt"
)

func portal1(c chan string) {
c <- "Welcome to channel 1"
}

func portal2(c chan string) {
c <- "Welcome to channel 2"
}

func main() {
R1 := make(chan string)
R2 := make(chan string)

go portal1(R1)
go portal2(R2)

select {
case op1 := <-R1:
fmt.Println(op1)
case op2 := <-R2:
fmt.Println(op2)
}
}
循环监听

select监听一次后会直接结束,可使用for进行循环等待:

package main

import (
"fmt"
"time"
)

func main() {
c1 := make(chan string)
c2 := make(chan string)

go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()

go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()

for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}

如上,会进行2select的监听,因此两个Goroutine的读取操作均可监听到。

监听发送

select也可用来监听发送数据:

package main

import "fmt"

func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)

go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()

fibonacci(c, quit)
}

如上,case c <- x表示Goroutine尝试将变量x的值发送到通道c,如果发送成功则执行该case,否则将继续等待监听。

default

一旦Channel阻塞,那么一定会走default

package main

import (
"fmt"
"time"
)

func main() {
var wg sync.WaitGroup
c := make(chan int)

wg.Add(2)

go func() {
defer wg.Done()
for {
select {
case c <- 1:
fmt.Println("不阻塞")
default:
fmt.Println("阻塞")
}
time.Sleep(2 * time.Second)
}
}()

go func() {
defer wg.Done()
fmt.Println(<-c)
}()

wg.Wait()
}

基本原理

Channel是一个hchanq结构体,结构如下:

type hchan struct {
// total data in the queue
qcount uint

// size of the circular queue
dataqsiz uint

// points to an array of dataqsiz elements
buf unsafe.Pointer

// element size
elemsize uint16

// channel closed flag
closed uint32

// element type
elemtype *_type

// send index
sendx uint

// receive index
recvx uint

// list of recv waiters (blocking goroutines)
recvq waitq

// list of send waiters (blocking goroutines)
sendq waitq

lock mutex
}

基本原理

阻塞与不阻塞

Channel的读取操作不同的场景可能产生阻塞或不阻塞:

不阻塞
  1. 缓冲区有数据。
  2. 缓冲区满了或没有缓冲区,但是sendq有待发送数据的Goroutine。
阻塞
  1. Channel是nil
  2. Chaneel没有缓冲区,sendq也没有等待的Goroutine。
  3. 有缓冲区但是缓存区没有数据,并且sendq也没有等待的Goroutine。

close

  1. Channel只声明了,并未初始化,关闭时会报错。
package main

var c chan string

func main() {
// panic: close of nil channel
close(c)
  1. 不能多次关闭同一个Channel。
package main

var c = make(chan string)

func main() {
close(c)
// panic: close of closed channel
close(c)
}
  1. 已关闭的Channel的再次发送数据会报错。Channel关闭以后会唤醒所有等待的Goroutine并放到一个glist链表,然后遍历该链表,遇到写入操作的Goroutine会panic,遇到读取操作的Goroutine会继续执行。
package main

import "time"

func main() {
var c = make(chan string, 3)

go func() {
c <- "a"
c <- "b"
close(c)
// panic: send on closed channel
c <- "c"
}()

time.Sleep(2 * time.Second)
}