[Go] 写一个守护协程的通用套路是什么?

背景

根据一个 Goroutine 是否直接依赖用户交互,我们可以将 Goroutine 分为两大类,一类是直接依赖用户交互的前台协程,比如 HTTP Server Handler等;另一类是不直接依赖用户交互的后台协程,比如 HTTP Server,定时任务协程等。前台协程随用户的交互开始执行,随交互结束而结束,比较容易设计。本文主要讨论后台协程设计的一些通用套路。

一个良好的后台协程需要至少满足以下两个诉求:

  • 容易控制,尤其是启动、停止、重启等操作。
  • 状态容易被观察,比如是否正在运行中。

针对这两个诉求,我们来寻找一个通用的实现套路。

设计与实现

简陋的后台协程

得益于 Go 从语法上对并发的支持,写一个简陋的后台协程再简单不过了。我们从下面这个 Demo 开始讨论,这个 Demo 的任务很简单,每隔一秒钟将下一个斐波那契数输出在标准输出里面。

package main

type Fibonacci struct {
	a, b int
}

func NewFibonacci() *Fibonacci {
	return &Fibonacci{a:0, b:1}
}

func (f *Fibonacci) Run() {
    go func() {
		for {
			time.Sleep(time.Second)
			fmt.Println(f.b)
			f.a, f.b = f.b, f.a + f.b
		}
	}()
}

func main() {
	NewFibonacci().Run()
}

直接执行这个程序,什么都不会输出,因为主协程里面没有任何逻辑执行,程序启动后直接就退出了,对吧?不过现实中许多后台协程就是这样写的,因为真实世界里很多主协程是有其它任务在执行的,所以 Fibonacci 会一直执行下去,直到程序结束。

入门级的后台协程

观察上面这个 Fibonacci 我们会发现它的一些缺陷:首先我们没法终止它,一旦启动就失控了;其次我们也没法观察它,比如在任何时候去向它要一个当前时间的斐波那契数,是要不到的。

先说控制,我们很容易想到一种方式,就是使用一个bool变量去维护协程是否需要继续运行下去。

然后获取斐波那契数这个事情也很简单,加一个方法就好了。

实际上,这种方案就是我遇到的大多数协程的实现方式。我们在 Fibonacci 上按这个方案写,代码就是这样:

type Fibonacci struct {
	a, b int
	stop bool
	mtx sync.Mutex
}

func NewFibonacci() *Fibonacci {
	return &Fibonacci{a:0, b:1}
}

func (f *Fibonacci) Run() {
	go func() {
		for {
			if f.isStop() {
				break
			}
			time.Sleep(time.Second)
			f.mtx.Lock()
			fmt.Println(f.b)
			f.a, f.b = f.b, f.a + f.b
			f.mtx.Unlock()
		}
	}()
}

// 调用 Stop 结束
func (f *Fibonacci) Stop() {
	f.mtx.Lock()
	defer f.mtx.Unlock()
	f.stop = true
}

func (f *Fibonacci) isStop() {
	f.mtx.Lock()
	defer f.mtx.Unlock()
	return f.stop
}

// Value 获取当前的斐波那契数
func (f *Fibonacci) Value() int {
	f.mtx.Lock()
	defer f.mtx.Unlock()
	return f.b
}

进阶版的后台协程

观察入门版的代码,我们会发现一些潜在的问题。首先,添加bool变量的方法的问题是需要自己维护一把锁,随着程序的升级,这把锁有可能会被用去保护别的变量,比如在代码中我们就用它来保护斐波那契数了。这样的做法可能会带来性能下降,如果逻辑不对甚至可能会出现死锁问题。

另外我们继续观察这段代码还会发现另一个问题,即我们调用Stop后,实际上很可能协程并不会马上结束,它有可能正好处在 Sleep 状态,所以 Stop 调用后,很可能过几秒会再打印一个数,然后协程才结束。

一般做到这一步时,会有人用想到用 channel 来代替bool变量了。我遇到的部分有经验的工程师会用这个办法。用 channel 有一个好处,是可以通过对多个channel同时select监听的方式,达到立马生效的效果。代码如下:

type Fibonacci struct {
	a, b int
	stop chan struct{}
	mtx sync.Mutex
}

func NewFibonacci() *Fibonacci {
	return &Fibonacci{
		a: 0,
		b: 1,
		stop: make(chan struct{}),
	}
}

func (f *Fibonacci) Run() {
	go func() {
		t := time.NewTicker(time.Second)
		for {
			select {
			case <-f.stop:
				t.Stop()
				return
			case <-t.C:
				f.mtx.Lock()
				fmt.Println(f.b)
                f.a, f.b = f.b, f.a + f.b
				f.mtx.Unlock()
			}
		}
	}()
}

// 调用 Stop 结束
func (f *Fibonacci) Stop() {
	close(f.stop)
}

// Value 获取当前的斐波那契数
func (f *Fibonacci) Value() int {
	f.mtx.Lock()
	defer f.mtx.Unlock()
	return f.b
}

这段代码基本上就是比较常见的实现得比较好的后台协程代码了,我们调用Start(),它就执行,调用Stop(),就立马结束,调用Value()就拿到结果。看上去还不错。

更好的后台协程

我们观察进阶版的实现,似乎挑不出什么毛病了。但实际上还有三个问题。

第一个问题是,如果程序中有不定量的类似 Fibonacci 这样的后台协程,如何用一套简单且行之有效的方式统一地控制它们,同时也保留单个控制的能力?

有一种简单的想法是,在程序中声明一个带Stop方法interface,然后用一个slice或map保存所有可以Stop的后台协程,在需要Stop的时候依次调用它们。

第二个问题是,如果连续调用Stop()两次,第二次就会因为关闭一个已经关闭的channel而出现panic。

第三个问题是,在这段代码中我们只是计算一下f.a+f.b并且print出来,不太会panic。在真实的代码中后台协程代码是有可能出现panic的,我们不光要避免这种panic由于未被recover导致整个程序崩溃,还需要在出现panic后自动恢复。

这些问题我们要自己解决起来也不是不行,但是如果自己解决下去的话,会写出很多代码,这不符合我对通用套路的标准:容易理解,实现成本低,不会因为过于复杂而难以在每个地方使用。

那么有没有简单高效的办法做到写出一个优雅的后台协程呢?办法是有的,答案就在标准库的 context 包里面。

下面就是这个套路的代码。

type Fibonacci struct {
	a, b int
	stop func()
	mtx sync.Mutex
}

func NewFibonacci() *Fibonacci {
	return &Fibonacci{a: 0, b: 1}
}

func (f *Fibonacci) Run(ctx context.Context) {
	// 使用WithCancel派生一个可被取消的ctx,用来控制后台
	// 协程。
	ctx, f.stop = context.WithCancel(ctx)
	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			case <-f.loop(ctx):
				// f.loop() 在正常运行时errch是阻塞状态,如果
				// 出错了才有数据,此时select会被唤起,并重新
				// 启动 loop(),实现panic后自动恢复。
			}
		}
	}()
}

func (f *Fibonacci) loop(ctx context.Context) <-chan error {
	errch := make(chan error)
	go func() {
		t := time.NewTicker(time.Second)
		defer func() {
			t.Stop()
			if r := recover(); r != nil {
				errch <- fmt.Errorf("panic with error %v", r)
                close(errch)
			}
		}()
		for {
			select {
			case <-ctx.Done():
                close(errch)
				return
			case <-t.C:
				f.nextFibonacci()
			}
		}
	}()
	return errch
}

func (f *Fibonacci) nextFibonacci() {
	f.mtx.Lock()
	defer f.mtx.Unlock()
	fmt.Println(f.b)
    f.a, f.b = f.b, f.a + f.b
}

// 调用 Stop 结束
func (f *Fibonacci) Stop() {
	if f.stop != nil {
		f.stop()
	}
}

// Value 获取当前的斐波那契数
func (f *Fibonacci) Value() int {
	f.mtx.Lock()
	defer f.mtx.Unlock()
	return f.b
}

我们来简单地看一下这个代码的几个关键点:

  1. Run 方法要求外部传入一个 Context,这样当外部取消这个 Context 时,Fibonacci 实际上也就结束了。
  2. Run 方法内部基于传入的 Context 又派生了一个 Context 出来,这样做的目的是为 stop 方法赋值,调用 f.stop 的时候,实际上就是调用Cancel方法来取消派生出来的 Context。
  3. Run 并不直接执行业务逻辑,而是另起loop协程去执行,Run 本身实际上是监督loop的执行,一旦loop出现panic,及时将其重启。当然,loop协程也是通过Context来控制的。

调用示例

最基本的调用如下:

f := NewFibonacci().Run(context.Background())
// ... 执行一些其它操作
f.Stop()

我们可以创建一大堆类似 Fibonacci 这样用 Context 控制的后台协程,然后很轻松地将他们全部结束。

ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 100; i++ {
	NewFibonacci().Run(ctx)
}
// ... 执行一些其它操作
// 调用cancel,100个后台协程全部结束
cancel()

我们也可以用 context.WithTimeout 创建带超时的 context,让 Fibonacci 后台只执行一小段时间。

ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
for i := 0; i < 100; i++ {
	NewFibonacci().Run(ctx)
}
<-ctx.Done()
cancel()

最重要的是,得益于 Context 在标准库中的广泛支持,我们可以很容易地将 Fibonacci 这种实现与各种控制方法结合起来,例如与 HTTP Request 结合,当一个请求进来时启动一个 Fibonacci,并且在请求结束后自动结束。

func ServeHTTP(w http.ResponseWriter, r *http.Request) {
	NewFibonacci().Run(r.Context())
	// ... 执行 Request 的处理逻辑
}

总结

我们讨论了写后台协程的一个通用套路,在这个套路里面有两个核心点需要遵循。

第一点是后台协程通过监听 Context 而不是自己创建的某个变量去做启停控制,这个 Context 有两个要点:从外部传入,在内部派生。

第二点是后台协程应该考虑实现类似 supervisor 这样的自动重启机制,在任务结束时自动恢复。

以上就是我所总结的写 Go 守护协程的套路,如果你发现我的方法有错误,或者你有更好的套路,欢迎留言讨论。