Go语言中的并发机制系列-4/1 | 深入理解 context 包

Posted by ShenHengheng on 2018-11-28

本系列阅读笔记是主要基于 Golang 经典图书 Concurrency in GO ,主要有6个章节:

  • An Introduction to Concurrency
  • Modeling Your Code: Communicating Sequential Processes**
  • Go’s Concurrency Building Blocks
  • Concurrency Patterns in Go
  • Concurrency at Scale
  • Goroutines and the Go Runtime

本部分主要摘选自第 4 章:Concurrency Patterns in Go ,Go 的并发模式.

在 Go Web编程中,由于每个传进来的请求(handlerFunc)都要交给一个 goroutine 来处理,那么为了使得主进程能够感应每个请求的状态,或者互联网中常见到的超时处理等,都不免少了 context 包的应用,它很强大,简约,但很容易让人忽视.本篇文章主要深度讲解 context 包的用法.下面是 context 官方的博客给出介绍(粗略翻译):

在Go服务器中,每个传入的请求都在单独的一个 goroutine 中进行处理. 请求处理程序通常会启动其他goroutine 去访问后端,例如数据库和RPC服务. 处理请求的 goroutine 集合通常需要访问特定请求的值,例如终端用户的身份(auth),授权令牌(token/password)和请求的截止日期(deadline).当请求取消或者超时,处理该请求的所有 goroutine 都应该快速退出,以便系统可以回收它们正在使用的任何资源.

在Google,我们开发了 context 包,它可以轻松地将 请求值取消信号API边界的截止日期传递给处理该请求的所有 goroutine .

Context Type

让我们看看 context 包里面有什么吧.

var Canceled = errors.New("context canceled")
var DeadlineExceeded error = deadlineExceededError{}
type CancelFunc
type Context
func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

下面是 Context 类型的定义, Context 类型是 context 包的核心.

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // 只有当 Context 取消或者超时,Done 将返回一个因抢占被关闭的channel.
    Done() <-chan struct{}

    // context取消的原因.
    Err() error

    // 用于标示 goroutine 是否在给定的时间后取消
    Deadline() (deadline time.Time, ok bool)

    // Context 携带的数据
    Value(key interface{}) interface{}
}
  • Done方法返回一个通道,作为代表 Context 运行的函数的取消信号:当通道关闭时,函数应该放弃它们的工作并返回。
  • Err方法返回一个错误,指示 Context 被取消的原因。
  • Go 作者注意到 goroutines 的主要用途之一是为请求提供服务.通常在这些程序中,除了有关抢占的信息之外,还需要传递特定请求的信息.这是 Value 函数的目的.

这种类型将像 Done 通道一样流经系统.如果使用 context 包, 顶级并发调用子/下游程序的每个函数都会将 Context 类型变量作为其第一个参数. 就像下面这样调用一样:

import "golang.org/x/net/context"

// ReadFile reads file name and returns its contents.
// If ctx.Done is closed, ReadFile returns ctx.Err immediately.
func ReadFile(ctx context.Context, name string) ([]byte, error)

现在我们只需要知道 context 包主要有两个目的:

  • To provide an API for canceling branches of your call-graph.
  • To provide a data-bag for transporting request-scoped data through your call-graph.

取消

函数中的取消有三个方面:

  • goroutine 的父母可能想要取消它.
  • goroutine 可能想要取消它的孩子.
  • goroutine 中的任何阻塞操作都是可抢占的,以便可以取消它.(被抢占)

context 包都已经实现了这三种方面.

正如之前定义的那样,Context 类型将会是函数的第一个参数. 如果你看一下Context接口上的方法,你会发现没有任何东西可以改变底层结构的状态.此外,没有任何东西允许接受Context的函数取消它.这可以保护来自取消上下文的子代的调用堆栈的功能.结合提供完成通道的 Done 方法,这允许Context类型从其前提安全地管理取消.

context 包提供了从现有 context 值中派生新的 Context 值的函数。这些值构成一个树:当一个Context被取消时,从它派生的所有上下文也被取消。

Context tree

Context tree

A = context.Background()
B = context.WithCancel(A)
C = context.WithValue(A, “Value”, “C”)
D = context.WithValue(A, “Value”, “D”)
E = context.WithValue(B, “B Key”, “E”)
..
H = context.WithValue(D, “D Key”, “H”)
..

这提出了一个问题:如果 Context 是不可变的,那么我们如何影响调用堆栈中当前函数下的函数中取消的行为?这是 context 包中的函数变得重要的地方.我们会到源码中,看看的下面三种函数:

Context 接口并没有定义,取消的方法.如果要使用,使用下面的三种函数,他们都返回可取消的 context 和 cancel 函数.

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

注意:所有这些函数都接受 Context 并返回一个Context.其中一些还涉及其他参数,如截止日期和超时时间.这些函数都生成一个新的 Context 实例,其中包含与这些函数相关的选项.

  • WithCancel 返回一个新的 Context 和一个 cancel 函数, 当调用 cancel 函数后主动关闭 完成的通道,此时 ctx.Done() 将返回。
  • WithDeadline 返回一个新的 Context 和一个 cancel 函数,当超过给定的截止时间(时间戳)时,它关闭完成的通道. 此时 ctx.Done() 将返回。
  • WithTimeout 返回一个新的 Context 和一个 cancel函数,在超过给定的持续时间(持续时间长度)后关闭完成的通道. 此时 ctx.Done() 将返回。

注意: 关闭通道与关闭文件或者关闭套接字不一样,关闭通道并不会使通道的机能完全停止— 他的作用就通知其他正在尝试从这个通道接受值的 goroutine,这个通道已经不再接受任何值了。

取消实例

// goroutine #1
ctx, cancel := context.WithCancel(parent)
...
data, err := ReadFile(ctx, name)

// goroutine #2
cancel()

context 包提供了两种创建空的 context 实例的方法:

func Background() Context
func TODO() Context

Background 简单地返回一个空的 ContextTODO 不是应用在生产环境下的,但是也返回一个空的 ContextTODO 的目的是作为 Context 的占位符,当不知道要使用哪个 Context 时, 或者你的代码将提供 Context 时,但上游代码尚未提供。

func main() {
  var wg sync.WaitGroup
  done := make(chan interface{})
  defer close(done)
  wg.Add(1)
  go func() {
    defer wg.Done()
    if err := printGreeting(done); err != nil {
      fmt.Printf("%v", err)
      return
    }
  }()
  wg.Add(1)
  go func() {
    defer wg.Done()
    if err := printFarewell(done); err != nil {
      fmt.Printf("%v", err)
      return
    }
  }()
  wg.Wait()
}

func printGreeting(done <-chan interface{}) error {
  greeting, err := genGreeting(done)
  if err != nil {
    return err
  }
  fmt.Printf("%s world!\n", greeting)
  return nil
}

func printFarewell(done <-chan interface{}) error {
  farewell, err := genFarewell(done)
  if err != nil {
    return err
  }
  fmt.Printf("%s world!\n", farewell)
  return nil
}

func genGreeting(done <-chan interface{}) (string, error) {
  switch locale, err := locale(done); {
  case err != nil:
    return "", err
  case locale == "EN/US":
    return "hello", nil
  }
  return "", fmt.Errorf("unsupported locale")
}

func genFarewell(done <-chan interface{}) (string, error) {
  switch locale, err := locale(done); {
  case err != nil:
    return "", err
  case locale == "EN/US":
    return "goodbye", nil
  }
  return "", fmt.Errorf("unsupported locale")
}

func locale(done <-chan interface{}) (string, error) {
  select {
  case <-done:
    return "", fmt.Errorf("canceled")
  case <-time.After(1*time.Minute):
  }
  return "EN/US", nil
}

运行结果为:

goodbye world
hello world

忽略竞争条件,我们可以看到程序中有两个分支同时运行。我们通过创建一个 done 通道并将其传递给我们的调用图来设置标准抢占方法。如果我们在 main 函数中的任何一点关闭完成通道,则两个分支都将被取消。

通过在 main 函数中引入 goroutine,我们开辟了以一些不同且有趣的方式控制该程序的可能性。也许我们希望 genGreeting 如果花费太长时间就超时。如果我们知道它的父母将很快被取消,我们可能不希望genFarewell 调用语言环境。在每个堆栈帧,函数可以影响它下面的整个调用堆栈。

使用 done 通道模式,我们可以通过将传入的 done 通道包装在其他完成的通道中然后在其中任何一个触发时返回来完成此操作,但是我们不会获得有关上下文给出的最后期限和错误的额外信息。

下面使用 context 包修改一下:

func main() {
  var wg sync.WaitGroup
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  wg.Add(1)
  go func() {
    defer wg.Done()
    if err := printGreeting(ctx); err != nil {
    fmt.Printf("cannot print greeting: %v\n", err)
    cancel()
    }
  }()
  wg.Add(1)
  go func() {
    defer wg.Done()
    if err := printFarewell(ctx); err != nil {
    fmt.Printf("cannot print farewell: %v\n", err)
    }
  }()
  wg.Wait()
}
func printGreeting(ctx context.Context) error {
  greeting, err := genGreeting(ctx)
  if err != nil {
    return err
  }
  fmt.Printf("%s world!\n", greeting)
  return nil
}
func printFarewell(ctx context.Context) error {
  farewell, err := genFarewell(ctx)
  if err != nil {
    return err
  }
  fmt.Printf("%s world!\n", farewell)
  return nil
}
func genGreeting(ctx context.Context) (string, error) {
  ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
  defer cancel()
  switch locale, err := locale(ctx); {
  case err != nil:
    return "", err
  case locale == "EN/US":
    return "hello", nil
  }
  return "", fmt.Errorf("unsupported locale")
}
func genFarewell(ctx context.Context) (string, error) {
  switch locale, err := locale(ctx); {
  case err != nil:
    return "", err
  case locale == "EN/US":
    return "goodbye", nil
  }
  return "", fmt.Errorf("unsupported locale")
}
func locale(ctx context.Context) (string, error) {
  select {
  case <-ctx.Done():
    return "", ctx.Err()
  case <-time.After(1 * time.Minute):
  }
  return "EN/US", nil
}