并发
什么是并发
并发意味着无序执行,将一组本应按顺序执行的指令以无序方式执行,但仍然产生相同的结果。
并发与并行并不相同,并行意味着同时执行两个或更多指令,这与并发是不同的概念。当至少有 2 个核心和硬件线程可用,并且至少有 2 个 Goroutine,每个 Goroutine 在各自的操作系统/硬件线程上独立执行指令时,并行才有可能实现。

如图1,两个逻辑处理器(P)的示意图,每个逻辑处理器(P)都有其独立的操作系统线程(M),并连接到机器上的独立硬件线程(Core)。可以看到两个 Goroutines(G1 和 G2)正在并行执行,同时在各自的硬件线程上执行它们的指令。在每个逻辑处理器内,三个 Goroutines 轮流共享各自的操作系统线程。所有这些 Goroutines 都在并发运行,以无特定顺序执行它们的指令,并共享操作系统线程的时间。
问题在于,有时利用并发而不使用并行实际上会降低吞吐量。有趣的是,有时利用并发和并行并不会带来你预期中更大的性能提升。
工作负载
了解正在处理的工作负载类型是一必要的,在考虑并发性时,有两种类型的工作负载是重要的。
- CPU受限:从不让 Goroutines 自然进入和退出等待状态的工作负载,不断进行计算的工作。
- IO受限:使 Goroutine 自然进入等待状态的工作负载。通常包括网络请求访问资源、向操作系统发出系统调用、等待事件、同步事件等。
对于 CPU 密集型工作负载,通常需要并行性来利用并发性。单个操作系统/硬件线程处理多个 Goroutine 并不高效,因为这些 Goroutine 在其工作负载中并未进入和退出等待状态。Goroutine 数量多于操作系统/硬件线程的情况可能会因为将 Goroutine 移入和移出操作系统线程的延迟成本(所需时间)而减慢工作负载的执行速度。上下文切换会为你的工作负载创建一个"停止世界"事件,因为在切换期间工作负载无法被执行,而本可以被执行。
对于 IO 密集型工作负载,不需要并行性来实现并发。单个操作系统/硬件线程可以高效地处理多个 Goroutine,因为 Goroutine 在其工作负载中会自然地进入和退出等待状态。拥有比操作系统/硬件线程更多的 Goroutine 可以加速工作负载的执行,因为将 Goroutine 移入和移出操作系统线程的延迟成本不会导致"停止世界"事件。工作负载会自然地停止,这允许另一个 Goroutine 高效地利用同一个操作系统/硬件线程,而不是让操作系统/硬件线程处于空闲状态。
案例一
场景
add 函数是否适合乱序执行的工作负载?答案是肯定的。整数集合可以分解成更小的列表,这些列表可以并发处理,一旦所有较小的列表求和完毕,这些和可以加在一起,产生与顺序版本相同的答案。
那么应该创建和独立处理多少个较小的列表才能获得最佳吞吐量?add 函数正在执行 CPU 密集工作,因为算法正在执行纯数学运算,并且没有任何操作会导致 Goroutine 进入自然等待状态,因此每个 OS/硬件线程使用一个 Goroutine 就足以获得良好的吞吐量。
- add.go
- add_concurrent.go
func add(numbers []int) int {
var v int
for _, n := range numbers {
v += n
}
return v
}
func addConcurrent(goroutines int, numbers []int) int {
var v int64
totalNumbers := len(numbers)
lastGorontine := goroutines - 1
step := totalNumbers / goroutines
var wg sync.WaitGroup
wg.Add(goroutines)
for g := 0; g < goroutines; g++ {
go func(g int) {
start := g * step
end := start + step
if g == lastGorontine {
end = totalNumbers
}
var lv int
for _, n := range numbers[start:end] {
lv += n
}
atomic.AddInt64(&v, int64(lv))
wg.Done()
}(g)
}
wg.Wait()
return int(v)
}
并发版本肯定比顺序版本更复杂,但这种复杂性值得吗?为了得到结果,需要做一个基准测试。对于这些基准测试,使用了一个包含 1000 万个数字的集合,并关闭了垃圾回收器。
基准测试
package main
import (
"runtime"
"testing"
"math/rand"
)
const N = 1000
var numbers = generateList(1e7)
func BenchmarkSequential(b *testing.B) {
b.N = N
for i := 0; i < b.N; i++ {
add(numbers)
}
}
func BenchmarkConcurrent(b *testing.B) {
b.N = N
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}
func BenchmarkSequentialAgain(b *testing.B) {
b.N = N
for i := 0; i < b.N; i++ {
add(numbers)
}
}
func BenchmarkConcurrentAgain(b *testing.B) {
b.N = N
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}
func generateList(totalNumbers int) []int {
numbers := make([]int, totalNumbers)
for i := 0; i < totalNumbers; i++ {
numbers[i] = rand.Intn(totalNumbers)
}
return numbers
}
顺序版本使用 1 个 Goroutine,而并发版本使用 runtime.NumCPU() 个 Goroutine,此这种情况下,并发版本利用了并发性但没有并行性。
结果分析
使用单核运行后,测试结果大致如下:
- Result
- Testing CMD
/*
10 Million Numbers using 8 goroutines with 1 core
Apple M1 Max 10 Cores
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
goos: darwin
goarch: arm64
pkg: basic5
cpu: Apple M1 Max
BenchmarkSequential 1000 3110058 ns/op ---> ~20% Faster
BenchmarkConcurrent 1000 3909370 ns/op
BenchmarkSequentialAgain 1000 3113586 ns/op ---> ~21% Faster
BenchmarkConcurrentAgain 1000 3967447 ns/op
*/
GOGC=off go test -cpu 1 -run none -bench .
如上结果,可以看到,在仅使用一个核心时,顺序版本比并发版本快大约 20% 到 21%。这是预料之中的,因为并发版本在该单一核心线程上有上下文切换的开销以及 Goroutine 的管理开销。
如果使用多核心运行,这种情况下,并发版本利用了并发和并行性,那么结果会完全不一样:
- Result
- Testing CMD
/*
10 Million Numbers using 8 goroutines with 1 core
Apple M1 Max 10 Cores
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
goos: darwin
goarch: arm64
pkg: basic5
cpu: Apple M1 Max
BenchmarkSequential-10 1000 3111412 ns/op
BenchmarkConcurrent-10 1000 1001474 ns/op ---> ~68% Faster
BenchmarkSequentialAgain-10 1000 3227339 ns/op
BenchmarkConcurrentAgain-10 1000 1004731 ns/op ---> ~69% Faster
*/
GOGC=off go test -cpu 10 -run none -bench .
如上结果,可以看到,在使用 10 个核心,并发版本比顺序版本快大约 68% 到 69%。这正是所预期的,因为所有 Goroutine 现在都在并行运行,10个 Goroutines 同时执行它们的并发工作。
案例二
此案例以读取文件并执行文本搜索为示例,测试 IO 密集型工作负载。
package main
import (
"encoding/xml"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)
func main() {
docs := generateList(1e3)
fmt.Println(find("Go", docs))
fmt.Println(findConcurrent(runtime.NumCPU(), "Go", docs))
}
func generateList(totalDocs int) []string {
docs := make([]string, totalDocs)
for i := 0; i < totalDocs; i++ {
docs[i] = "test.xml"
}
return docs
}
func read(doc string) ([]item, error) {
time.Sleep(time.Millisecond) // Simulate blocking disk read.
var d document
if err := xml.Unmarshal([]byte(file), &d); err != nil {
return nil, err
}
return d.Channel.Items, nil
}
func find(topic string, docs []string) int {
var found int
for _, doc := range docs {
items, err := read(doc)
if err != nil {
continue
}
for _, item := range items {
if strings.Contains(item.Description, topic) {
found++
}
}
}
return found
}
func findConcurrent(goroutines int, topic string, docs []string) int {
var found int64
ch := make(chan string, len(docs))
for _, doc := range docs {
ch <- doc
}
close(ch)
var wg sync.WaitGroup
wg.Add(goroutines)
for g := 0; g < goroutines; g++ {
go func() {
var lFound int64
for doc := range ch {
items, err := read(doc)
if err != nil {
continue
}
for _, item := range items {
if strings.Contains(item.Description, topic) {
lFound++
}
}
}
atomic.AddInt64(&found, lFound)
wg.Done()
}()
}
wg.Wait()
return int(found)
}
var file = `<?xml version="1.0" encoding="UTF-8"?>
<rss>
<channel>
<title>Going Go Programming</title>
<description>Golang : https://github.com/goinggo</description>
<link>http://www.goinggo.net/</link>
<item>
<pubDate>Sun, 15 Mar 2015 15:04:00 +0000</pubDate>
<title>Object Oriented Programming Mechanics</title>
<description>Go is an amazing language.</description>
<link>http://www.goinggo.net/2015/03/object-oriented</link>
</item>
</channel>
</rss>`
type item struct {
XMLName xml.Name `xml:"item"`
Title string `xml:"title"`
Description string `xml:"description"`
Link string `xml:"link"`
}
type channel struct {
XMLName xml.Name `xml:"channel"`
Title string `xml:"title"`
Description string `xml:"description"`
Link string `xml:"link"`
PubDate string `xml:"pubDate"`
Items []item `xml:"item"`
}
type document struct {
XMLName xml.Name `xml:"rss"`
Channel channel `xml:"channel"`
URI string
}
测试结果
单核:
/*
goos: darwin
goarch: arm64
pkg: basic5
cpu: Apple M1 Max
BenchmarkSequential 3 1273483639 ns/op
BenchmarkConcurrent 20 123174867 ns/op ---> ~90% Faster
BenchmarkSequentialAgain 3 1269836431 ns/op
BenchmarkConcurrentAgain 20 122210204 ns/op ---> ~90% Faster
*/
并发版本比顺序版本快约 90%。这是符合预期的,因为所有 Goroutine 都在高效地共享 Machine。每个 Goroutine 在 read 调用时发生的自然上下文切换使得在 Machine 上随着时间的推移完成更多工作。
多核:
/*
goos: darwin
goarch: arm64
pkg: basic5
cpu: Apple M1 Max
BenchmarkSequential-10 3 1302326097 ns/op
BenchmarkConcurrent-10 20 133926031 ns/op ---> ~90% Faster
BenchmarkSequentialAgain-10 3 1267427764 ns/op
BenchmarkConcurrentAgain-10 20 125634300 ns/op ---> ~90% Faster
*/
基于基准测试结果,额外的核心处理并未提供更好的性能。
总结
对于 IO 密集型工作负载,并不需要并行性就能显著提升性能。这与 CPU 密集型工作负载的情况正好相反。对于像冒泡排序这样的算法,使用并发会增加复杂性,而不会带来任何实际的性能收益。确定工作负载是否适合并发,并识别工作负载的类型以使用正确的语义是很重要的。