前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Continuation - 连接异步任务和同步代码

Continuation - 连接异步任务和同步代码

原创
作者头像
DerekYuYi
发布2022-01-23 20:13:15
2.1K0
发布2022-01-23 20:13:15
举报
文章被收录于专栏:Swift-开源分析
  • 提议:SE-0300
  • 作者:John McCall, Joe Groff, Doug Gregor, Konrad Malawski
  • 审核主管:Ben Cohen
  • 状态:在 Swift 5.5 已实现
  • 历史修订版本:1, 2

介绍

异步 Swift 代码需要能够和现有同步代码一起使用,这些同步代码使用 completion 回调或者 delegate 方法等技术来响应事件。在 continuations 上,异步任务可以挂起自身,同步代码能够捕获并调用 continuations 来恢复任务,响应事件。

Swift-evolution 关键点时间线:

动机

Swift APIs 经常通过 callback 的方式提供异步代码执行操作。这可能是因为代码本身是在引入 async/await 之前编写的,也可能因为它与一些主要由事件驱动组成的系统相关联,在这种情况下,可能需要在内部使用 callback 的同时向程序提供异步接口。调用异步任务需要能够挂起其本身,同时为事件驱动同步系统提供一种机制来恢复它以响应事件。

提议的解决方案

Swift 库将会提供 API 用来为当前异步任务获取 continuation。获取任务的 continuation 会挂起该任务,并产生一个值,同步代码可以使用 handle 来恢复任务。最终给定的 API 基于 completion callback,例如:

代码语言:swift
复制
func beginOperation(completion: (OperationResult) -> Void)

我们可以把上述beginOperation(completion:)转为一个async接口,即通过挂起该任务并在调用 callback 时,使用该任务的 continuation 恢复它,并把传进 callback 的参数转为异步函数的正常返回值:

代码语言:swift
复制
func operation() async -> OperationResult {
  // 挂起当前任务,并把它的 continuation 传给 closure,该 closure 会直接执行
  return await withUnsafeContinuation { continuation in
    // 调用同步基于回调的 API(the synchronous callback-based API)
    beginOperation(completion: { result in
      // 当执行回调时,恢复 continuation
      continuation.resume(returning: result)
    }) 
  }
}

设计细节

原始 unsafe continuations

Swift 库提供了两个函数:withUnsafeContinuationwithUnsafeThrowingContinuation,它们均允许从异步代码内部调用基于 callback 的API。每个函数都接受一个 operation 闭包参数,基于 callback 的 API 将会调用该闭包。这个operation 闭包参数接受一个 continuation 实例,该 continuation 实例必须在 callback 中执行恢复操作,提供返回值或者抛出错误,它们会在异步任务恢复时,成为withUnsafeContinuationwithUnsafeThrowingContinuation的调用结果。

代码语言:swift
复制
struct UnsafeContinuation<T, E: Error> {
  func resume(returning: T)
  func resume(throwing: E)
  func resume(with result: Result<T, E>)
}

extension UnsafeContinuation where T == Void {
  func resume() { resume(returning: ()) }
}

extension UnsafeContinuation where E == Error {
  // Allow covariant use of a `Result` with a stricter error type than
  // the continuation:
  func resume<ResultError: Error>(with result: Result<T, ResultError>)
}

func withUnsafeContinuation<T>(
    _ operation: (UnsafeContinuation<T, Never>) -> ()
) async -> T

func withUnsafeThrowingContinuation<T>(
    _ operation: (UnsafeContinuation<T, Error>) throws -> ()
) async throws -> T

在当前任务上下文中,withUnsafe*Continuation(表示withUnsafeContinuationwithUnsafeThrowingContinuation两个函数,下文类似)将会立即执行 operation 参数对应的闭包,并传入用于恢复任务的 continuation 参数值。operation必须安排 continuation 在之后的某个点恢复。在operation函数返回后,当前任务也已经挂起。当前任务必须通过调用 continuation 的resume方法跳出挂起状态。注意resume在将任务从暂停状态转换出来后,会立即把上下文的控制权返回给调用者,如果任务所在的执行器不重新调度它,任务本身实际上不会恢复执行。resume(throwing:)可用来通过传递给定错误来恢复任务。为了方便起见,可以使用给定的Result,resume(with:)通过正常返回或者根据Result状态引发错误来恢复任务。如果operation在返回前引发了未捕获的错误,这就好像 operation 调用了resume(throwing:)并出现错误一样。

如果withUnsafe*Continuation返回类型是Void,当调用resume(returning:)函数时,必须指定()的值。这样做会出现奇怪的代码(比如resume(returning: ())),所以Unsafe*Continuation<Void>有另一个成员函数resume(),让resume调用可读性更强。

调用withUnsafeContinuation之后,resume函数在程序每个执行路径必须且仅调用一次。Unsafe*Continuation是一个不安全的接口,因此如果在同一个 continuation 上多次调用resume方法,会出现未定义的行为。任务在恢复执行之前都是挂起状态,如果 continuation 取消且从未调用resume,此时任务在程序结束之前都一直保持挂起状态,会造成它所有的资源发生内存泄漏。包装器(Wrapper)可以提供对这些误用 continuation 的检查,库也会提供一个这样的包装器,如下所述。

例如,使用Unsafe*ContinuationAPI,可以包装这样的函数(例子为了表现 continuation API 的灵活性,故意编写的比较复杂):

代码语言:swift
复制
func buyVegetables(
  shoppingList: [String],
  // a) if all veggies were in store, this is invoked *exactly-once*
  onGotAllVegetables: ([Vegetable]) -> (),

  // b) if not all veggies were in store, invoked one by one *one or more times*
  onGotVegetable: (Vegetable) -> (),
  // b) if at least one onGotVegetable was called *exactly-once*
  //    this is invoked once no more veggies will be emitted
  onNoMoreVegetables: () -> (),
  
  // c) if no veggies _at all_ were available, this is invoked *exactly once*
  onNoVegetablesInStore: (Error) -> ()
)
// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
  try await withUnsafeThrowingContinuation { continuation in
    var veggies: [Vegetable] = []

    buyVegetables(
      shoppingList: shoppingList,
      onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
      onGotVegetable: { v in veggies.append(v) },
      onNoMoreVegetables: { continuation.resume(returning: veggies) },
      onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
    )
  }
}

let veggies = try await buyVegetables(shoppingList: ["onion", "bell pepper"])

由于把正确的 continuation 恢复操作调用编写到buyVegetables函数复杂的 callback 中,我们可以为该函数提供更好的重载,并允许异步代码以更自然自上而下的方式与该函数交互。

Checked continuations

Unsafe*Continuation为连接同步和异步代码提供了一种轻量机制,但它容易误用,误用会以危险的方法破坏处理状态。为了在同步和异步代码开发接口时提供额外的安全性和指导,库会提供一个包装器,用来检查continuation的不合法使用:

代码语言:swift
复制
struct CheckedContinuation<T, E: Error> {
  func resume(returning: T)
  func resume(throwing: E)
  func resume(with result: Result<T, E>)
}

extension CheckedContinuation where T == Void {
  func resume()
}

extension CheckedContinuation where E == Error {
  // Allow covariant use of a `Result` with a stricter error type than
  // the continuation:
  func resume<ResultError: Error>(with result: Result<T, ResultError>)
}

func withCheckedContinuation<T>(
    _ operation: (CheckedContinuation<T, Never>) -> ()
) async -> T

func withCheckedThrowingContinuation<T>(
  _ operation: (CheckedContinuation<T, Error>) throws -> ()
) async throws -> T

Unsafe*ContinuationAPI有意设计与Unsafe*Continuation相同,这样代码就可以轻松在已检查和未检查之间切换。例如,上面buyVegetables的例子可以通过把withUnsafeThrowingContinuation换成withCheckedThrowingContinuation选择检查:

代码语言:swift
复制
// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
  try await withCheckedThrowingContinuation { continuation in
    var veggies: [Vegetable] = []

    buyVegetables(
      shoppingList: shoppingList,
      onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
      onGotVegetable: { v in veggies.append(v) },
      onNoMoreVegetables: { continuation.resume(returning: veggies) },
      onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
    )
  }
}

如果程序尝试多次恢复 continuation,Unsafe*Continuation会导致未定义的行为,而CheckedContinuation会导致陷入陷阱。CheckedContinuation也会记录一个警告,如果 continuation 没有恢复任务就被丢弃,这会导致任务一直卡在挂起状态,它拥有的所有资源都会发生泄漏。无论程序的优化级别如何,都会进行这些检查。

其他例子

Continuations 也能用来与事件驱动接口交互,这些接口比 callback 更复杂。只要整个过程遵循 continuation 被正确执行恢复操作一次的要求,continuation 可以在任何地方执行恢复操作。例如,当Operation实现finish操作时,会触发 continuation 的恢复操作:

代码语言:swift
复制
class MyOperation: Operation {
  let continuation: UnsafeContinuation<OperationResult, Never>
  var result: OperationResult?

  init(continuation: UnsafeContinuation<OperationResult, Never>) {
    self.continuation = continuation
  }

  /* rest of operation populates `result`... */

  override func finish() {
    continuation.resume(returning: result!)
  }
}

func doOperation() async -> OperationResult {
  return await withUnsafeContinuation { continuation in
    MyOperation(continuation: continuation).start()
  }
}

下面例子来自 结构化并发提议 中,它把URLSession封装到任务中,允许任务的取消控制 session 的取消,并使用 continuation 来响应网络活动中的数据和错误事件:

代码语言:swift
复制
func download(url: URL) async throws -> Data? {
  var urlSessionTask: URLSessionTask?

  return try Task.withCancellationHandler {
    urlSessionTask?.cancel()
  } operation: {
    let result: Data? = try await withUnsafeThrowingContinuation { continuation in
      urlSessionTask = URLSession.shared.dataTask(with: url) { data, _, error in
        if case (let cancelled as NSURLErrorCancelled)? = error {
          continuation.resume(returning: nil)
        } else if let error = error {
          continuation.resume(throwing: error)
        } else {
          continuation.resume(returning: data)
        }
      }
      urlSessionTask?.resume()
    }
    if let result = result {
      return result
    } else {
      Task.cancel()
      return nil
    }
  }
}

基于回调 API 的包装器也可以遵守其父/当前任务的取消操作,例如:

代码语言:swift
复制
func fetch(items: Int) async throws -> [Items] {
  let worker = ... 
  return try Task.withCancellationHandler(
    handler: { worker?.cancel() }
  ) { 
    return try await withUnsafeThrowingContinuation { c in 
      worker.work(
        onNext: { value in c.resume(returning: value) },
        onCancelled: { value in c.resume(throwing: CancellationError()) },
      )
    } 
  }
}

如果任务允许有实例,可以获取调用fetch(items:)函数的任务实例,并在 withUnsafeThrowingContinuation 内部有合适场景可以调用取消时,取消对该任务的调用。

备选方案

CheckedContinuation命名为Continuation

我们可以将CheckedContinuation定位为执行同步/异步接口的"默认" API,方法是将 Checked 单词从名称中去掉。这当然符合 Swift 的常见理念,即首选安全接口,在性能是首要考虑因素的情况下,有选择得使用不安全接口。不过,有 2 个顾虑让我们没有这样做:

  • 尽管误用CheckedContinuation的后果没有误用UnsafeContinuation那么严重,但它仍然只尽力检查一些常见的误用模式,并且没有让继续误用的后果完全没有意义:丢弃没有执行恢复操作的 continuation 仍然会泄漏未恢复任务;尝试多次恢复 continuation 仍然会造成传到 continuation 中的信息丢失;如果with*Continuation操作误用了 continuation,这仍然是一个严重的编程错误。CheckedContinuation只会使错误更加明显。
  • 现在命名Continuation类型占用了一个"好"名字,如果我们在将来的某个时候只移动类型,我们希望引入一个静态强制执行"恰好一次"属性的 continuation 类型。

不要公开UnsafeContinuation

人们认为不应该暴露UnsafeContinuation,因为可以用Checked形式代替。我们认为只要用户验证了他们那些与性能敏感的 API 是正确的,就可以避免与这些 API 交互带来的检查成本。

CheckedContinuation捕获所有误用, 或者记录所有误用

CheckedContinuation建议当程序在同一个 continuation 上尝试恢复同一个任务 2 次时进行捕获,但只在放弃 continuation 而未执行恢复操作时才记录警告。我们认为这是针对这些情况的正确权衡,原因如下:

  • 对于CheckedContinuation,多次执行恢复操作会破坏任务过程,并让它处于未定义状态。通过在任务多次恢复时捕获,CheckedContinuation会把未定义行为变为定义良好的捕获情况。这点与标准库中其他 checked/unchecked 相似,比如!和对于OptionalunsafelyUnwrapped
  • 相比之下,UnsafeContinuation执行恢复操作失败,除了会泄漏挂起任务的资源,不会破坏任务;程序剩余的任务可以继续正常执行。而且,检测和报告这样泄漏的唯一办法是在类实现时使用deinit方法。由于来自 ARC 优化的再计数可变性,执行 deinit 的确切点并非完全可预测。如果捕获deinit方法,那么捕获是否执行以及何时执行可能会随着优化级别而变化,我们认为这不会带来好体验。

*Continuation上公开更多TaskAPI, 或者允许在 continuation 中恢复Handle

TaskHandleAPI 对 handle 的持有者提供了任务状态的额外控制,特别是查询和设置取消状态,以及等待任务最终结果的能力。人们觉得为什么*Continuation类型不公开这些功能。Continuation的角色与Handle大不相同,handle 代表且控制整个任务的生命周期,而 continuation 只代表任务生命周期中的单个挂起点。而且,*ContinuationAPI 主要设计用来允许与 Swift 中结构化并发模型之外的代码进行通信,任务之间的交互最好尽可能在该模型内处理。

注意*Continuation本身也不需要支持任何任务 API。例如,某人希望某个任务在响应回调时取消其本身,他们可以通过在continuation的 resume 类型(例如可选的nil)插入哨兵来实现这一点:

代码语言:swift
复制
let callbackResult: Result? = await withUnsafeContinuation { c in
  someCallbackBasedAPI(
    completion: { c.resume($0) },
    cancellation: { c.resume(nil) })
}

if let result = callbackResult {
  process(result)
} else {
  cancel()
}

提供立即恢复任务的 API,避免"队列跳转"

有些 API 除了接受 completion handler 和代理外,也允许程序控制在哪里调用 completion handler 和代理。例如,Apple 平台上的某些 API 为应该调用 completion handler 的调度队列使用参数。在这些情况下,如果原始的 API 能够在调度队列上(无论生命调度机制,比如线程或者 run loop)直接恢复任务,这是最佳场景,任务的执行器也会继续执行该任务。

为了做到这点,我们提供with*Continuation的一个变体,除了提供 continuation,还提供任务期望在其上恢复执行的调度队列。with*Continuation类型会提供一组unsafeResumeImmediately API,这些 API 会在当前线程上立即回恢复当前任务的执行。它们有可能是这样:

代码语言:swift
复制
// Given an API that takes a queue and completion handler:
func doThingAsynchronously(queue: DispatchQueue, completion: (ResultType) -> Void)

// We could wrap it in a Swift async function like:
func doThing() async -> ResultType {
  await withUnsafeContinuationAndCurrentDispatchQueue { c, queue in
    // Schedule to resume on the right queue, if we know it
    doThingAsynchronously(queue: queue) {
      c.unsafeResumeImmediately(returning: $0)
    }
  }
}

这种 API 必须很小心地使用,程序员也要很小心检查是否在正确的上下文中调用unsafeResumeImmediately,并且在一段可能的无限时间内,从调用者中接管当前线程的控制权是安全的。如果在错误的上下文中执行任务,它会破坏当前已有代码,编译器和运行时所做的全部假设,最终导致错误很难调试。如果发现基于 continuation 适配器的"队列跳转"在实践中被证明是一个性能问题,我们可以将其作为核心提议的补充来研究。

修改记录

第三次修改:

  • 使用单个Continuation<T, E: Error>类型代替单独的*Continuation<T>*ThrowingContinuation<T>类型,Continuation<T, E: Error>类型带有 Error 类型。
  • 为 continuation 增加resume()方法,该方法相当于resume(returning: ())方法,返回值为Void类型。
  • with*ThrowingContinuation增加operationblock,该 block 有可能会抛出异常,如果从操作中传出了未捕获的错误,block 会立即恢复抛出错误的任务往下执行。

第二次修改:

  • 描述清楚with*Continuation*Continuation.resume的执行行为,即在挂起任务之前,with*Continuation会立即在当前上下文中执行其操作参数,再取消挂起任务后,对应的resume会立即返回它的调用方,任务将会由它的执行者调度。
  • 删除了一个在必须调用resume时不必要的不变量;在with*Continuation操作开始执行后的任何一个时间点,仅能有效调用一次resume;当with*Continuation操作返回时,不需要精确地调用resume
  • 增加"未来方向"小节讨论一个可能的更高级 API,该 API 允许 continuations 在知道正确的调度队列时直接恢复其任务。
  • 在返回Continuation类型上增加resume()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 动机
  • 提议的解决方案
  • 设计细节
    • 原始 unsafe continuations
      • Checked continuations
      • 其他例子
      • 备选方案
        • 将CheckedContinuation命名为Continuation
          • 不要公开UnsafeContinuation
            • 让CheckedContinuation捕获所有误用, 或者记录所有误用
              • 在*Continuation上公开更多TaskAPI, 或者允许在 continuation 中恢复Handle
                • 提供立即恢复任务的 API,避免"队列跳转"
                • 修改记录
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档