Go并发模型

语句执行规则

Don't communicate by sharing memory; share memory by communicating.

不要通过共享数据来通讯,以通讯的方式来共享数据。

channel类型的值,被用来以通讯的方式共享数据。一般被用来在不同的goroutine(代表并发编程模型中的用户级线程)之间传递数据。

调度

调度时机

  • 就绪态 -> 运行态:当进程被创建时,会进入到就绪队列,操作系统会从就绪队列选择一个进程运行

  • 运行态 -> 阻塞态:当进程发生 I/O 事件而阻塞时,操作系统必须另外一个进程运行

  • 运行态 -> 结束态:当进程退出结束后,操作系统得从就绪队列选择另外一个进程运行

调度原则

  • CPU 利用率:调度程序应确保 CPU 是始终匆忙的状态,这可提高 CPU 的利用率

  • 系统吞吐量:吞吐量表示的是单位时间内 CPU 完成进程的数量,长作业的进程会占用较长的 CPU 资源,因此会降低吞吐量,相反,短作业的进程会提升系统吞吐量

  • 周转时间:周转时间是进程运行和阻塞时间总和,一个进程的周转时间越小越好

  • 等待时间:这个等待时间不是阻塞状态的时间,而是进程处于就绪队列的时间,等待的时间越长,用户越不满意

  • 响应时间:用户提交请求到系统第一次产生响应所花费的时间,在交互式系统中,响应时间是衡量调度算法好坏的主要标准

调度算法

  • 抢占式

  • 非抢占式

单核系统

  • 先来先服务调度算法

  • 最短作业优先调度算法

  • 高响应比优先调度算法

  • 时间片轮转调度算法

  • 最高优先级调度算法

  • 多级反馈队列调度算法

并发模型

并发模型其实和分布式系统模型非常相似,在并发模型中是线程彼此进行通信,而在分布式系统模型中是 进程 彼此进行通信。然而本质上,进程和线程也非常相似。

  • 分布式系统通常要比并发系统面临更多的挑战和问题比如进程通信、网络可能出现异常,或者远程机器挂掉等

  • 并发模型同样面临着比如 CPU 故障、网卡出现问题、硬盘出现问题等

并行worker

这些共享状态可能会使用一些工作队列来保存业务数据、数据缓存、数据库的连接池等。

在线程通信中,线程需要确保共享状态是否能够让其他线程共享,而不是仅仅停留在 CPU 缓存中让自己可用,当然这些都是程序员在设计时就需要考虑的问题。

多线程在访问共享数据时,会丢失并发性,因为操作系统要保证只有一个线程能够访问数据,这会导致共享数据的争用和抢占。未抢占到资源的线程会阻塞

流水线(事件驱动系统)

Actor模型

在 Actor 模型中,每一个 Actor 其实就是一个 Worker, 每一个 Actor 都能够处理任务。

Actor 模型是一个并发模型,它定义了一系列系统组件应该如何动作和交互的通用规则。一个参与者Actor对接收到的消息做出响应,然后可以创建出更多的 Actor 或发送更多的消息,同时准备接收下一条消息。

Actor 模型重在参与交流的实体(即进程),而 CSP 重在交流的通道,如 Go 中的 channel。

Channels模型

也叫CSP(Communicating sequential processes)。

在 Channel 模型中,worker 通常不会直接通信,与此相对的,他们通常将事件发送到不同的 通道(Channel)上,然后其他 worker 可以在这些通道上获取消息。

有的时候 worker 不需要明确知道接下来的 worker 是谁,他们只需要将结果写入通道中,监听 Channel 的 worker 可以订阅或者取消订阅,这种方式降低了 worker 和 worker 之间的耦合性。

与 Actor 相比,CSP 最大的优点是灵活性。Actor 模型,负责通信的媒介和执行单元是耦合的。而 CSP 中,channel 是第一类对象,可以被独立创造、写入、读出数据,也可以在不同执行单元中传递。

CSP 模型也易受死锁影响,且没有提供直接的并行支持。并行需要建立在并发基础上,引入了不确定性。

CSP 模型不关注发送消息的进程,而是关注发送消息时使用的 channel,而 channel 不像 Actor 模型那样进程与队列紧耦合。而是可以单独创建和读写,并在进程 (goroutine) 之间传递。

Golang Runtime

调度器

Go语言拥有:

  • 独特的并发编程模型

  • 用户级线程goroutine

  • 强大的用于调度goroutine、对接操作系统的调度器

这个调度器是Go语言运行时系统的重要组成部分,主要负责统筹调配Go并发编程模型中的三个主要元素:

  • G(goroutine):协程是一种用户级线程,属于轻量级线程,是对代码片段的封装,拥有执行时的栈、状态和代码片段等信息

  • P(processor):代表 Go 代码片段执行所需的上下文环境。M 和 P 的结合能够为 G 提供有效的运行环境,它们之间的结合关系不是固定的。P 的最大数量决定了 Go 程序的并发规模,由 runtime.GOMAXPROCS 变量决定

  • M(machine):相当于内核线程在 Go 进程中的映射,它与内核线程一一对应,代表真正执行计算的资源。在 M 的生命周期内,它只会与一个内核线程关联。

协程的调度,完全由用户空间的代码控制;协程拥有自己的寄存器上下文和栈,并存储在用户空间;协程切换时无须切换到内核态访问内核空间,切换速度极快。但这也给开发人员带来较大的技术挑战:开发人员需要在用户空间处理协程切换时上下文信息的保存和恢复、栈空间大小的管理等问题。

宏观上讲,由于P的存在,用户级线程和系统级线程可以呈现多对多的关系。

在实际执行过程中,M 和 P 共同为 G 提供有效的运行环境(如下图),多个可执行的 G 顺序挂载在 P 的可执行 G 队列下面,等待调度和执行。当 G 中存在一些 I/O 系统调用阻塞了 M 时,P 将会断开与 M 的联系,从调度器空闲 M 队列中获取一个 M 或者创建一个新的 M 组合执行, 保证 P 中可执行 G 队列中其他 G 得到执行,且由于程序中并行执行的 M 数量没变,保证了程序 CPU 的高利用率。

当 G 中系统调用执行结束返回时,M 会为 G 捕获一个 P 上下文,如果捕获失败,就把 G 放到全局可执行 G 队列等待其他 P 的获取。新创建的 G 会被放置到全局可执行 G 队列中,等待调度器分发到合适的 P 的可执行 G 队列中。M 和 P 结合后,会从 P 的可执行 G 队列中无锁获取 G 执行。当 P 的可执行 G 队列为空时,P 才会加锁从全局可执行 G 队列获取 G。当全局可执行 G 队列中也没有 G 时,P 会尝试从其他 P 的可执行 G 队列中“剽窃” G 执行。

因为调度器帮我们做了很多事,所以Go程序才能高效地利用操作系统和计算机资源。程序中所有的用户级线程都会被充分地调度,其中的代码也都会并发地运行,即使用户级线程有数十万计。

  • 全局队列(Global Queue):存放等待运行的G

  • P的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建G'时,G'优先加入到P的本地队列,如果队列满了,则会把本地队列中一半的G移动到全局队列

  • P列表:所有的P都在程序启动时创建,并保存在数组中,最多有GOMAXPROCS(可配置)个

  • M:线程想运行任务就得获取P,从P的本地队列获取G,P队列为空时,M也会尝试从全局队列拿一批G放到P的本地队列,或从其他P的本地队列偷一半放到自己P的本地队列。M运行G,G执行之后,M会从P获取下一个G,不断重复下去

P和M的数量

  • P的数量:由启动时环境变量$GOMAXPROCS或者是由runtime的方法GOMAXPROCS()决定。这意味着在程序执行的任意时刻都只有$GOMAXPROCS个goroutine在同时运行。

  • M的数量:

    1. go语言本身的限制:go程序启动时,会设置M的最大数量,默认10000。但是内核很难支持这么多的线程数,所以这个限制可以忽略

    2. runtime/debug中的SetMaxThreads函数,设置M的最大数量

    3. 一个M阻塞了,会创建新的M

Go的并发模型

并发程序中的多个线程同时在 CPU 执行,由于资源之间的相互依赖和竞态条件,需要一定的并发模型协作不同线程之间的任务执行。Go 中倡导使用 CSP 并发模型来控制线程之间的任务协作,类似同步队列,更加关注消息的传输方式,CSP 倡导使用通信的方式来进行线程之间的内存共享,解耦了消息的发送和接收方。

channel 是 go 在并发编程通信的推荐手段,可以独立创建和存取,在不同的协程中传递使用,Go 语言推荐使用通信来进行进程间同步消息。这样做有三点好处:

  • 首先,使用发送消息来同步信息相比于直接使用共享内存和互斥锁是一种更高级的抽象,使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰

  • 其次,消息发送在解耦方面与共享内存相比也有一定优势,我们可以将线程的职责分成生产者和消费者,并通过消息传递的方式将它们解耦,不需要再依赖共享内存

  • 最后,Go 语言选择消息发送的方式,通过保证同一时间只有一个活跃的线程能够访问数据,能够从设计上天然地避免线程竞争和数据冲突的问题

并发控制

  • sync.WaitGroup:某任务需要多 goroutine 协同工作,每个 goroutine 只能做该任务的一部分,只有全部的 goroutine 都完成,任务才算是完成

  • channel+select:比较优雅的通知一个 goroutine 结束;多groutine中数据传递

  • context:多层级groutine之间的信号传播(包括元数据传播,取消信号传播、超时控制等),优雅的解决了 goroutine 启动后不可控的问题

goroutine最佳实践

package main

import "fmt"

func main() {
    for i := 0; i < 10; i++ {
        go func() {
            fmt.Println(i)
        }()
    }
}

代码执行后,不会有任何内容输出。

与一个进程总会有一个主线程类似,每一个独立的Go程序运行起来总会有一个主用户线程(goroutine)。这个主goroutine会在Go程序的运行准备工作完成后被自动地启用,并不需要任何手动操作。

每条go语句一般都会携带一个函数调用,这个被调用的函数被称为go函数,主用户线程(goroutine)的go函数,就是那个程序入口的main函数。

已经存在的用户级线程会被优先复用

go函数被真正执行的时间,总会与其所属的go语句被执行的时间不同。当程序执行到一条go语句,Go语言运行时系统,会先试图从某个存放空闲的用户级线程的队列中获取某个用户级线程,它只有找不到空闲的用户级线程的情况们才会去创建一个新的用户级线程。

用户级线程的创建成本很低

创建一个新的用户级线程并不会像创建一个进程或者一个系统级线程那样,必须通过操作系统的系统调用来完成,在Go语言的运行时系统内部就可以完成了,一个用户级线程就相当于需要并发执行代码片段的上下文环境。

在拿到空闲的用户级线程之后,Go语言运行时系统会用这个用户级线程去包装那个go函数(函数中的代码),然后再把这个用户级线程追加到某个可运行的用户级线程队列(先进先出)中。虽然在队列中被安排运行的时间很快,上述的准备工作也不可避免,因此存在一定时间消耗。所以go函数的执行时间,总是会明显滞后(相对于CPU和Go程序)于go语句的执行时间

只要go语句本身执行完毕,Go程序完全不用等待go函数的执行,它会立刻去执行后面的语句,这就是异步并发执行

注意:一旦主用户级线程(main函数中的那些代码)执行完毕,当前的Go程序就会结束运行。如果在Go程序结束的那一刻,还有用户级线程没有运行,那就没有机会运行了。

严格的说,Go语言并不会保证用户级线程会以怎样的顺序运行,因为主用户级线程会与手动启动的其他用户级线程一起接受调度,又因为调度器很可能会在用户级线程中的代码只执行了一部分的时候暂停,以期所有的用户级线程有更公平的运行机会。所以哪个用户级线程先执行完,是不可预知的,除非使用了某种Go语言提供的方式进行人为干预。

主用户级线程等待其他用户级线程

  1. 让主用户级线程Sleep()一会:但是时间难以把握

  2. 其他用户级线程运行完毕之后发出通知:创建一个通道,长度与手动启动的用户级线程一致,每个用户级线程运行完毕的时候向通道中发送一个值(在go函数的最后发送),在main函数的最后接收通道中的值,接收次数与手动启动的用户级线程数量一致

  3. sync包中的sync.WaitGroup类型

sign := make(chan struct{}, num)        // 结构体类型的通道

sign <- struct{}{}

<- sign

struct{}类似于空接口interface{},代表既不包含任何字段也不拥有任何方法的空结构体类型。

struct{}类型的值的表示方法只有一个:struct{}{},它占用的内存空间是0字节。这个值在整个Go程序中永远都只会存在一份。无数次的这个值的字面量,但是用到的却是同一个值。

用户级线程按顺序执行

package main

import (
            "fmt"
            "sync/atomic"
            "time"
        )

func main() {
    // 用户级线程随机执行
    for i := 0; i < 10; i++ {
        go func() {
            fmt.Println(i)
        }()
    }

    // 用户级线程按顺序执行
    for i := 0; i < 10; i++ {
        go func(i int) {     // 让go函数接收一个int型参数,在调用它的时候,把变量传进去
            fmt.Println(i)   // 这样Go语言保证每个用户级线程都可以拿到一个唯一的整数
        }(i)
    }

    // 在go语句被执行时,传给go函数的参数`i`会被先求值,如此就得到了当次迭代的序号,
    // 之后,无论go函数会在什么时候执行,这个参数值都不会变,
    // 也就是go函数中调用`fmt.Prinrln`函数打印的一定是那个当次迭代的序号。

    var count uint32

    for i := uint32(0); i < 10; i++ {
        go func(i uint32) {
            fn := func() {
                fmt.Println(i)
            }
            trigger(i, fn)
        }(i)
    }

    trigger := func(i uint32, fn func()) {
        for {
            if n := atomic.LoadUint32(&count); n == i {     // 原子操作
                fn()
                atomic.AddUint32(&count, 1)                 // 原子操作
                break
            }
            time.Sleep(time.Nanosecond)
        }
    }

    trigger(10, func(){})
}

系统调用

Go 会优化系统调用(无论阻塞与否),通过运行时封装它们。封装的那一层会把 P 和线程 M 分离,并且可以让另一个用户线程在它上面运行。下面以文件读取举例:

func main() {
   buf := make([]byte, 0, 2)

   fd, _ := os.Open("number.txt")
   fd.Read(buf)
   fd.Close()

   println(string(buf)) // 42
}

文件读取的流程如下:

P0 现在在空闲 list 中,有可能被唤醒。当系统调用 exit 时,Go 会遵守下面的规则,直到有一个命中了。

  • 尝试去捕获相同的 P,在我们的例子中就是 P0,然后 resume 执行过程

  • 尝试从空闲 list 中捕获一个 P,然后 resume 执行过程

  • 把 G 放到全局队列里,把与之相关联的 M 放回空闲 list 去

然而,像 http 请求之类的非阻塞I/O情形下,Go在资源没有准备好时也会处理请求。在这种情形下,第一个系统调用遵循上述流程图,由于资源还没有准备好所以不会成功,(这样就)迫使 Go 使用 network poller 并使协程停驻,如下示例:

func main() {
   http.Get(`https://httpstat.us/200`)
}

当第一个系统调用完成且显式地声明了资源还没有准备好,G 会在 network poller 通知它资源准备就绪之前一直处于停驻状态。在这种情形下,线程 M 不会阻塞:

在 Go 调度器在等待信息时 G 会再次运行。调度器在获取到等待的信息后会询问 network poller 是否有 G 在等待被运行。

如果多个协程都准备好了,只有一个会被运行,其他的会被加到全局的可运行队列中,以备后续的调度。

系统线程方面的限制

在系统调用中,Go 不会限制可阻塞的 OS 线程数。

GOMAXPROCS 变量表示可同时运行用户级线程的操作系统线程的最大数量。系统调用中可被阻塞的最大线程数并没有限制;可被阻塞的线程数对 GOMAXPROCS 没有影响。这个会自动将GOMAXPROCS设置为与Linux容器CPU配额(如果有)匹配的值。或者使用runtime包中的GOMAXPROCS函数设置。

如下示例:

func main() {
   var wg sync.WaitGroup

   for i := 0;i < 100 ;i++  {
      wg.Add(1)

      go func() {
         http.Get(`https://httpstat.us/200?sleep=10000`)

         wg.Done()
      }()
   }

   wg.Wait()
}

利用追踪工具得到的线程数如下:

由于 Go 优化了系统线程使用,所以当 G 阻塞时,它仍可复用,这就解释了为什么图中的数跟示例代码循环中的数不一致。

通道

Don't communicate by sharing memory;share memory bu communicating. 不要通过共享内存来通信,而应该通过通信来共享内存。

通道与goroutine共同代表Go语言独有的并发编程模式和编程哲学,利用通道在多个goroutine之间传递数据。

通道类型的值,本身就是并发安全的。这是Go语言自带的唯一一个可以满足并发安全性的类型。

在声明并初始化一个通道的时候,需要使用Go内建函数make(),传给这个函数的第一个参数应该是代表了通道的具体类型的类型字面量,第二个参数是一个int类型的值,不能小于0,表示通道的容量(该参数可选)。

  • 当容量 = 0 ,表示非缓冲通道

  • 当容量 > 0 ,表示缓冲通道

非缓冲通道和缓冲通道有不同的数据传递方式。

声明一个通道类型变量的时候,首先要确定该通道类型的元素类型,这决定了通过这个通道传递声明类型的数据。

var name chan T // 双向 channel
var name chan <- T // 只能发送消息的 channel
var name T <- chan // 只能接收消息的 channel
// chan 表示通道类型的关键字
// T 说明该通道类型的元素类型

ch1 := make(chan int,3)

ch1 <- val // 发送消息
val := <- ch1 // 接收消息
val, ok := <- ch1 // 非阻塞接收消息
// 无论channel中是否存在消息都会立即返回,通过ok布尔值判断是否接收成

一个通道相当一个FIFO队列,通道中各个元素严格按照发送顺序排列,元素值的发送和接收都用到操作符<-,称为接送操作符,该符号形象的表示了元素值的传输方向。

对发送与接收操作的基本特性:

  1. 对同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的:

    在同一时刻,Go语言运行时系统只会执行对同一个通道的任意个发送操作中的某一个。直到这个元素值被完全复制进该通道之后,其他针对该通道的发送操作才可能被执行。接收操作也是一样的,即使操作是并发执行的也是如此。

    对于通道内的同一个元素值,发送操作和接收操作之间也是互斥的。即使一个正在被复制进通道但还未复制完成的元素值,也绝不会被想接收它的一方看到和取走。

    元素值从外界进入通道时是被复制,即进入通道的并不是接收操作符右边的那个元素值,而是它的副本

    元素从通道进入外界时会被移动:

    1. 生成正在通道中的这个元素值的副本,并准备给到接收方

    2. 删除在通道中的这个元素值

  2. 发送操作和接收操作中对元素值的处理都是不可分割的:

    不可分割表示处理通道中元素的操作是一个原子操作:

    1. 发送操作要么没复制值,要么已经复制完毕

    2. 接收操作在准备好元素值副本之后,一定会删除掉通道中的原值,绝不会出现有残留的情况

  3. 发送操作和接收操作在完全完成之前会被阻塞

    发送操作:

    1. 复制元素值

    2. 放置副本到通道内部

      接收操作:

    3. 复制通道内的元素值

    4. 放置副本到接收方

    5. 删除原值

      在所有步骤完全完成前,发起该操作的代码会一直阻塞,直到该代码所在goroutine收到了运行时系统的通知并重新获得运行机会为止。

      如此阻塞代码就是为了实现操作的互斥和元素值的完整

长时间阻塞的发送和接收操作

缓冲通道

  1. 如果通道已满,对它的所有发送操作都会被阻塞,直到通道中有元素值被接收走,此时通道会优先通知最早因此而等待的那个发送操作所在的goroutine,然后再次执行发送操作。

由于发送操作在这种情况下被阻塞后,它们所在的goroutine会顺序地进入通道内部的发送等待队列,所以通知的顺序是公平的。

  1. 如果通道已空,对它的所有接收操作都会被阻塞,直到通道中有新的元素出现,此时通道会优先通知最早等待的那个接收操作所在的goroutine,并使它再次执行接收操作。

因此而等待的所有接收操作所在的goroutine都会按照先后顺序被放入通道内部的接收等待队列。

缓冲通道作为收发双方的中间件,元素值先从发送方复制到缓冲通道,之后再由缓冲通道复制给接收方。当发送操作在执行的时候发现空的通道中,正好有等待的接收操作,那么它会直接把元素值复制给接收方

非缓冲通道

无论是发送操作还是接收操作,一开始执行就会被阻塞,直到配对的操作也开始执行,才会继续传递。

非缓冲通道是在用同步的方式传递数据,只有收发双方对接上了,数据才会被传递。

数据直接从发送方复制到接收方,中间并没有非缓冲通道做中转,相比之下,缓冲通道则在用异步的方式传递数据。

错误使用通道

对值为nil的通道,不论它的具体类型是什么,对它的发送和接收操作都会永久地处于阻塞状态。它们所属的goroutine中的任何代码都不再会被执行。

通道类型是引用,所以它的零值就是nil,只声明该类型的变量但没有用make()函数对它初始化时,该变量的值就是nil。

引起panic的发送和接收操作

  1. 对已关闭的通道进行发送操作(接收操作可以感知到通道已经关闭,并安全退出)

  2. 关闭已经关闭的通道

接收操作返回两个值:

  • 元素值

  • 接收操作成功与否

如果通道关闭时,里面有元素值未取出,接收操作会把通道中的值按顺序取值之后,在返回通道已关闭的false判断。因此,通过接收操作的返回值的第二个值来判断通道是否关闭有延迟。

除非有特殊保证,否则让发送方关闭通道,而不是接收方。

单向通道

  • 通常说的通道都是双向通道,可以发也可以收。

  • 单向通道:只能发或者之只能收。

一个通道是双向还是单向,由类型字面量体现。

var uselessChan = make(chan<- int,1)    // 发送通道,只能发(往通道中发送)

uselessChan = make(<-chan int,1)        // 接收通道,只能收(从通道中接收)

站在操作通道的代码的角度,看单向通道是发送通道还是接收通道

单向通道的价值

单向通道最主要的用途是约束其他代码的行为

例子:

func SendInt(ch chan<- int) {
    ch <- rand.Intn(1000)
}
// SendInt函数,只能接受一个发送通道,函数中的代码只能向通道中发送元素值,而不能从通道中接收元素值

在实际场景中,约束一般出现在接口类型声明中的某个方法定义上,或者,声明函数类型时,如果使用单向通道,相当于约束所有实现这个函数类型的函数。在编写模板代码或可扩展的程序库是很有用

type Notifier interface {
    SendInt(ch chan<- int)
}
// 定义SendInt方法,参数是一个发送通道
// 该接口的所实现类型中的SendInt方法都受到限制

// 在调用SendInt函数时,只需要将一个双向通道作为参数传递给它,Go语言会自定把它转换为所需的单向通道

在接口类型声明的花括号中,每一行代表一个方法的定义。

接口中方法定义与函数声明很类似,只包含方法名,参数列表和结果列表。一个类型如果想要成为一个接口的实现类型,必须实现接口中定义的所有方法。因此某个方法中定义了单向通道,那么相当于对它的所有实现做出约束

func getIntChan() <-chan int {
    num := 5
    ch := make(chan int, num)
    for i := 0; i < num; i++ {
        ch <- i
    }
    close(ch)
    return ch
}

// 在函数的结果类别中使用单向通道
// 得到该通道的程序,只能从通道中接收元素值,这是对函数调用方法的一种约束

intChan2 := getIntChan()
for elem := range intChan2 {
    fmt.Printf("The element in intChan2: %v\n", elem)
}

// for语句循环的从单向通道中取出元素值

对上述for语句的解释:

  1. for语句会不断尝试从initChan2取出元素值,即使通道被关闭,也会取出所有剩余的元素值之后再结束执行

  2. 单向通道中没有元素值时,代码会被阻塞在for关键字那一行

  3. initChan2的值为nil,代码会被永远阻塞在for关键字那一行

上述三点是带range子句的for循环与通道的联系,Go还有专门操作通道的select语句。

select多路复用

select语句只能与通道联用,一般由若干个分支组成,每次执行select语句的时候,只有一个分支中的代码会被执行,它提供类似多路复用的能力,使得 goroutine 可以同时等待多个 channel 的读写操作。select 的形式与 switch 类似,但是要求 case 语句后面必须为 channel 的收发操作。

select语句的分支:

  1. 候选分支:以关键字case开头,后面是一个case表达式和一个冒号,从下一行开始写入,分支被选中时需要执行的语句

  2. 默认分支:default case当且仅当没有候选分支被选中时,它才会被执行,default开头后面直接是冒号,从下一行开始写入要执行的语句

select语句是专门为通道而设计的,每个case表达式中只能包含操作通道的表达式,如接收表达式。

例子:

// 准备好几个通道。
intChannels := [3]chan int{
    make(chan int, 1),
    make(chan int, 1),
    make(chan int, 1),
}

// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index

// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
    fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
    fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
    fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
    fmt.Println("No candidate case is selected!")
}

select语句的注意点:

  1. 设置默认分支后,无论涉及通道操作的表达式是否有阻塞,select语句都不会被阻塞

  2. 如果没有默认分支,一旦所有case表达式都没有满足求值条件,那么select语句就会被阻塞,直到至少有一个case表达式满足条件为止

  3. 当通道关闭后,会从通道中接收到其元素类型的零值,所以需要接收表达式的第二个结果值来判断通道是否关闭。一旦发现某个通道关闭了,应该及时屏蔽对应的分支或采取其他措施

  4. select语句只能对其中的每一个case表达式各求值一次。如果连续或定时地操作其中的通道,就需要通过for语句中嵌入select语句的方式实现。简单地在select语句的分支中使用break语句,只能结束当前的select语句的执行,而并不会对外层的for语句产生作用。这种错误的用法可能会让这个for语句无休止的运行下去。如下面的例子

intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
    close(intChan)
})
select {
case _, ok := <-intChan:
    if !ok {
        fmt.Println("The candidate case is closed.")
        break
    }
    fmt.Println("The candidate case is selected.")
}

分支选择规则

  1. 每一个case表达式,至少有一个发送或接收操作,也可以包含其他的表达式。多个表达式从左到右顺序被求值

  2. select语句包含的候选分支中的case表达式会在该语句执行时先被求值,求值顺序从代码编写的顺序从上往下,所有分支都会被求值,从上到下,从左往右

  3. case表达式中的发送或接收操作处于阻塞状态时,该case表达式的求值就不成功,即候选分支不满足条件

  4. 只有当所有case表达式都被求值完成后,才开始选择候选分支。只会挑选满足条件的候选分支执行

    1. 所有候选分支都不满足条件,选择默认分支

    2. 没有默认分支,select语句处于阻塞状态,直到至少有一个候选分支满足条件为止

  5. 如果同时有多个候选分支满足条件,用伪随机算法在候选分支中选择一个,然后执行

  6. 一个select语句只有一个默认分支,并且默认分支只在无候选分支可选的时候才会执行,与编写位置无关

  7. select语句的每次执行,包括case表达式求值和分支选择都是独立的,它的执行是否并发安全,要看其中的case表达式以及分支中是否包含并发不安全的代码

四大用法

  • 满足条件的case是随机选择的

  • 增加超时机制time.AfterFunc()

  • 检查channel是否已满

  • for+select,要在 select 区间直接结束掉 for 循环,只能使用 break <标识> 来结束(标识定义在for循环之外)

并发设计模式

下面每一种模式的设计都依赖于 channel。

Barrier模式

barrier 屏障模式故名思义就是一种屏障,用来阻塞直到聚合所有 goroutine 返回结果。可以使用 channel 来实现。

使用场景

  • 多个网络请求并发,聚合结果

  • 粗粒度任务拆分并发执行,聚合结果

/*
* Barrier
*/
type barrierResp struct {
   Err error
   Resp string
   Status int
}

// 构造请求
func makeRequest(out chan<- barrierResp, url string) {
   res := barrierResp{}

   client := http.Client{
       Timeout: time.Duration(2*time.Microsecond),
  }

   resp, err := client.Get(url)
   if resp != nil {
       res.Status = resp.StatusCode
  }
   if err != nil {
       res.Err = err
       out <- res
       return
  }

   byt, err := ioutil.ReadAll(resp.Body)
   defer resp.Body.Close()
   if err != nil {
       res.Err = err
       out <- res
       return
  }

   res.Resp = string(byt)
   out <- res
}

// 合并结果
func barrier(endpoints ...string) {
   requestNumber := len(endpoints)

   in := make(chan barrierResp, requestNumber)
   response := make([]barrierResp, requestNumber)

   defer close(in)

   for _, endpoints := range endpoints {
       go makeRequest(in, endpoints)
  }

   var hasError bool
   for i := 0; i < requestNumber; i++ {
       resp := <-in
       if resp.Err != nil {
           fmt.Println("ERROR: ", resp.Err, resp.Status)
           hasError = true
      }
       response[i] = resp
  }
   if !hasError {
       for _, resp := range response {
           fmt.Println(resp.Status)
      }
  }
}

func main() {
   barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}

Barrier 模式也可以使用 golang.org/x/sync/errgroup 扩展库来实现,这样更加简单明了。这个包有点类似于 sync.WaitGroup,但是区别是当其中一个任务发生错误时,可以返回该错误。

func barrier(endpoints ...string) {
   var g errgroup.Group
   var mu sync.Mutex

   response := make([]barrierResp, len(endpoints))

   for i, endpoint := range endpoints {
       i, endpoint := i, endpoint // create locals for closure below
       g.Go(func() error {
           res := barrierResp{}
           resp, err := http.Get(endpoint)
           if err != nil {
               return err
          }

           byt, err := ioutil.ReadAll(resp.Body)
           defer resp.Body.Close()
           if err != nil {
               return err
          }

           res.Resp = string(byt)
           mu.Lock()
           response[i] = res
           mu.Unlock()
           return err
      })
  }
   if err := g.Wait(); err != nil {
      fmt.Println(err)
  }
   for _, resp := range response {
       fmt.Println(resp.Status)
  }
}

Future模式

常用在异步处理也称为 Promise 模式,采用一种 fire-and-forget 的方式,是指主 goroutine 不等子 goroutine 执行完就直接返回了,然后等到未来执行完的时候再去取结果。在 Go 中由于 goroutine 的存在,实现这种模式是挺简单的。

使用场景

  • 异步

/*
* Future
*/
type Function func(string) (string, error)

type Future interface {
   SuccessCallback() error
   FailCallback()    error
   Execute(Function) (bool, chan struct{})
}

type AccountCache struct {
   Name string
}

func (a *AccountCache) SuccessCallback() error {
   fmt.Println("It's success~")
   return nil
}

func (a *AccountCache) FailCallback() error {
   fmt.Println("It's fail~")
   return nil
}

func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
    // 空  struct 在 Go 中占的内存是最少的
   done := make(chan struct{})
   go func(a *AccountCache) {
       _, err := f(a.Name)
       if err != nil {
           _ = a.FailCallback()
      } else {
           _ = a.SuccessCallback()
      }
       done <- struct{}{}
  }(a)
   return true, done
}

func NewAccountCache(name string) *AccountCache {
   return &AccountCache{
       name,
  }
}

func testFuture() {
   var future Future
   future = NewAccountCache("Tom")
   updateFunc := func(name string) (string, error){
       fmt.Println("cache update:", name)
       return name, nil
  }
   _, done := future.Execute(updateFunc)
   defer func() {
       <-done
  }()
}

func main() {
   var future Future
   future = NewAccountCache("Tom")
   updateFunc := func(name string) (string, error){
       fmt.Println("cache update:", name)
       return name, nil
  }
   _, done := future.Execute(updateFunc)
   defer func() {
       <-done
  }()
   // do something
}

Pipeline模式

注意和 Barrire 模式不同的是,它是按顺序的,类似于流水线,通过 buffer channel 将多个goroutine串起来,只要前序 goroutine 处理完一部分数据,就往下传递,达到并行的目的。

使用场景

  • 利用多核的优势把一段粗粒度逻辑分解成多个 goroutine 执行

/*
* Pipeline 模式
*
* 实现一个功能,给定一个切片,然后求它的子项的平方和。
*
* 例如,[1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14。
*
* 正常的逻辑,遍历切片,然后求平方累加。使用 pipeline 模式,可以把求和和求平方拆分出来并行计算。
*/

func generator(max int) <-chan int{
   out := make(chan int, 100)
   go func() {
       for i := 1; i <= max; i++ {
           out <- i
      }
       close(out)
  }()
   return out
}

func power(in <-chan int) <-chan int{
   out := make(chan int, 100)
   go func() {
       for v := range in {
           out <- v * v
      }
       close(out)
  }()
   return out
}

func sum(in <-chan int) <-chan int{
   out := make(chan int, 100)
   go func() {
       var sum int
       for v := range in {
           sum += v
      }
       out <- sum
       close(out)
  }()
   return out
}

func main() {
   // [1, 2, 3]
   fmt.Println(<-sum(power(generator(3))))
}

Worker Pool模式

使用场景

  • 高并发任务

在 Go 中 goroutine 已经足够轻量,甚至 net/http server 的处理方式也是 goroutine-per-connection 的,所以比起其他语言来说可能场景稍微少一些。每个 goroutine 的初始内存消耗在 2~8kb,当我们有大批量任务的时候,需要起很多 goroutine 来处理,这会给系统代理很大的内存开销和 GC 压力,这个时候就可以考虑一下协程池。

/*
* Worker pool
*/
type TaskHandler func(interface{})

type Task struct {
   Param   interface{}
   Handler TaskHandler
}

type WorkerPoolImpl interface {
   AddWorker()                  // 增加 worker
   SendTask(Task)               // 发送任务
   Release()                    // 释放
}

type WorkerPool struct {
   wg   sync.WaitGroup
   inCh chan Task
}

func (d *WorkerPool) AddWorker() {
   d.wg.Add(1)
   go func(){
       for task := range d.inCh {
           task.Handler(task.Param)
      }
       d.wg.Done()
  }()
}

func (d *WorkerPool) Release() {
   close(d.inCh)
   d.wg.Wait()
}

func (d *WorkerPool) SendTask(t Task) {
   d.inCh <- t
}

func NewWorkerPool(buffer int) WorkerPoolImpl {
   return &WorkerPool{
       inCh: make(chan Task, buffer),
  }
}

func main() {
   bufferSize := 100
   var workerPool = NewWorkerPool(bufferSize)
   workers := 4
   for i := 0; i < workers; i++ {
       workerPool.AddWorker()
  }

   var sum int32
   testFunc := func (i interface{}) {
       n := i.(int32)
       atomic.AddInt32(&sum, n)
  }
   var i, n int32
   n = 1000
   for ; i < n; i++ {
       task := Task{
           i,
           testFunc,
      }
       workerPool.SendTask(task)
  }
   workerPool.Release()
   fmt.Println(sum)
}

Pub/Sub模式

发布订阅模式是一种消息通知模式,发布者发送消息,订阅者接收消息。

使用场景

  • 消息队列

/*
* Pub/Sub
*/
type Subscriber struct {
   in     chan interface{}
   id     int
   topic  string
   stop   chan struct{}
}

func (s *Subscriber) Close() {
   s.stop <- struct{}{}
   close(s.in)
}

func (s *Subscriber) Notify(msg interface{}) (err error) {
   defer func() {
       if rec := recover(); rec != nil {
           err = fmt.Errorf("%#v", rec)
      }
  }()
   select {
   case s.in <-msg:
   case <-time.After(time.Second):
       err = fmt.Errorf("Timeout\n")
  }
   return
}

func NewSubscriber(id int) SubscriberImpl {
   s := &Subscriber{
       id: id,
       in: make(chan interface{}),
       stop: make(chan struct{}),
  }
   go func() {
       for{
           select {
           case <-s.stop:
               close(s.stop)
               return
           default:
               for msg := range s.in {
                   fmt.Printf("(W%d): %v\n", s.id, msg)
              }
          }
  }}()
   return s
}

// 订阅者需要实现的方法
type SubscriberImpl interface {
   Notify(interface{}) error
   Close()
}

// sub 订阅 pub
func Register(sub Subscriber, pub *publisher){
   pub.addSubCh <- sub
   return
}

// pub 结果定义
type publisher struct {
   subscribers []SubscriberImpl
   addSubCh    chan SubscriberImpl
   removeSubCh chan SubscriberImpl
   in          chan interface{}
   stop        chan struct{}
}

// 实例化
func NewPublisher () *publisher{
   return &publisher{
       addSubCh: make(chan SubscriberImpl),
       removeSubCh: make(chan SubscriberImpl),
       in: make(chan interface{}),
       stop: make(chan struct{}),
  }
}

// 监听
func (p *publisher) start() {
   for {
       select {
       // pub 发送消息
       case msg := <-p.in:
           for _, sub := range p.subscribers{
               _ = sub.Notify(msg)
          }
       // 移除指定 sub
       case sub := <-p.removeSubCh:
           for i, candidate := range p.subscribers {
               if candidate == sub {
                   p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
                   candidate.Close()
                   break
              }
          }
       // 增加一个 sub
       case sub := <-p.addSubCh:
           p.subscribers = append(p.subscribers, sub)
       // 关闭 pub
       case <-p.stop:
           for _, sub := range p.subscribers {
               sub.Close()
          }
           close(p.addSubCh)
           close(p.in)
           close(p.removeSubCh)
           return
      }
  }
}


func main() {
   // 测试代码
   pub := NewPublisher()
   go pub.start()

   sub1 := NewWriterSubscriber(1)
   Register(sub1, pub)

   sub2 := NewWriterSubscriber(2)
   Register(sub2, pub)

   commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
   for _, c := range commands {
       pub.in <- c
  }

   pub.stop <- struct{}{}
   time.Sleep(time.Second*1)
}

注意事项

  • 同步问题,尤其同步原语和 channel 一起用时,容易出现死锁

  • goroutine 崩溃问题,如果子 goroutine panic 没有 recover 会引起主 goroutine 异常退出

  • goroutine 泄漏问题,确保 goroutine 能正常关闭

最后更新于

这有帮助吗?