任务 | Task

便利产生和等待任务。

任务是指在整个生命周期内执行特定操作的进程,通常很少或根本不与其他进程通信。最常见的任务用例是通过异步计算值将顺序代码转换为并发代码:

task = Task.async(fn -> do_some_work() end)
res  = do_some_other_work()
res + Task.await(task)

async如上面的例子所示,可以通过调用者进程(以及唯一的调用者)等待产生的任务。它们是通过产生一个进程来实现的,一旦给定的计算被执行,该进程就向调用者发送消息。

除了async/1await/2,任务也可以作为监视树的一部分启动,并在远程节点上动态生成。接下来我们将探讨这三种情况。

异步等待

任务的常见用途之一是将顺序代码转换为并发代码。Task.async/1同时保持它的语义。调用时,调用方将创建、链接和监视新进程。任务操作完成后,将向调用方发送一条消息,并得到结果。

Task.await/2用于读取任务发送的消息。

在使用async*

  • 如果使用异步任务,则必须等待他们的答复发送了。如果您不期望收到答复,请考虑使用Task.start_link/1详见下文。
  • 异步任务链接调用者和生成的进程。这意味着,如果调用者崩溃,任务也会崩溃,反之亦然。这是有意的:如果接收结果的过程不再存在,那么完成计算就没有意义了。

如果不想这样做,请使用Task.start/1或考虑在Task.Supervisor使用async_nolinkstart_child...

Task.yield/2await/2调用者暂时阻止的地方的替代方案,等待任务应答或崩溃。如果结果未在超时范围内到达,则可以在稍后再次调用结果。这允许多次检查任务的结果。如果答复没有在期望的时间内到达,Task.shutdown/2可以用来停止任务。

监督任务

还可以在主管的领导下生成任务。它通常通过在自己的模块中定义任务来完成:

defmodule MyTask do
  use Task

  def start_link(arg) do
    Task.start_link(__MODULE__, :run, [arg])
  end

  def run(arg) do
    # ...
  end
end

然后把它递给主管:

Supervisor.start_link([MyTask])

由于这些任务是受监督的,并且不与呼叫者直接相关,所以不能等待。请注意start_link/1,不像async/1返回{:ok, pid}(这是主管预期的结果)。

Note use Task定义了一个child_spec/1函数,允许将定义的模块置于监督树下。生成的child_spec/1可以使用以下选项进行自定义:

  • :id-子规范id,不符合当前模块
  • :start- 如何启动子进程(默认为调用__MODULE__.start_link/1
  • :restart-当应该重新启动孩子时,默认为:temporary
  • :shutdown-如何关闭孩子

相反GenServerAgent并且Supervisor,一个任务有一个默认:restart:temporary。这意味着即使任务崩溃,任务也不会重新启动。如果您希望因存在不成功而重启任务,请执行以下操作:

use Task, restart: :transient

如果希望始终重新启动任务:

use Task, restart: :permanent

Supervisor有关更多信息的文档。

动态监督任务

Task.Supervisor模块允许开发人员动态创建多个监督任务。

一个简短的例子是:

{:ok, pid} = Task.Supervisor.start_link()
task = Task.Supervisor.async(pid, fn ->
  # Do something
end)
Task.await(task)

但是,在大多数情况下,您想要将任务主管添加到您的监督树中:

Supervisor.start_link([
  {Task.Supervisor, name: MyApp.TaskSupervisor}
])

现在您可以动态地启动受监督的任务:

Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
  # Do something
end)

甚至使用异步/等待模式:

Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
  # Do something
end) |> Task.await()

最后,检查Task.Supervisor其他支持的操作。

分布式任务

由于Elixir提供了一个任务管理器,因此很容易使用它来动态生成跨节点的任务:

# On the remote node
Task.Supervisor.start_link(name: MyApp.DistSupervisor)

# On the client
Task.Supervisor.async({MyApp.DistSupervisor, :remote@local},
                      MyMod, :my_fun, [arg1, arg2, arg3])

请注意,使用分布式任务时,应该使用Task.Supervisor.async/4需要显式模块,函数和参数的函数,而不是Task.Supervisor.async/2使用匿名函数的函数。这是因为匿名函数期望在所有涉及的节点上存在相同的模块版本。检查Agent模块文档以获取有关分布式过程的更多信息,因为其中描述的限制适用于整个生态系统。

类型

t()

功能

%Task{}

任务结构

async(fun)

开始一项必须等待的任务

async(mod, fun, args)

开始一项必须等待的任务

async_stream(enumerable, fun, options \ [])

返回运行给定函数的流。fun中的每个项目并发enumerable

async_stream(enumerable, module, function, args, options \ [])

返回运行给定module,,,function,和args中的每个项目并发enumerable

await(task, timeout \ 5000)

等待任务答复并返回它。

shutdown(task, shutdown \ 5000)

取消链接并关闭任务,然后检查是否有答复。

start(fun)

开始一项任务

start(mod, fun, args)

开始一项任务

start_link(fun)

启动链接到当前进程的进程。

start_link(mod, fun, args)

作为监督树的一部分启动任务。

yield(task, timeout \ 5000)

临时阻止当前进程等待任务答复。

yield_many(tasks, timeout \ 5000)

在给定时间间隔内产生多个任务。

t()

t() :: %Task{owner: term, pid: term, ref: term}

%Task{} (struct)

任务结构。

它包含以下字段:

  • :pid-任务过程的PID;nil如果任务不使用任务进程
  • :ref-任务监控器参考资料
  • :owner-启动任务的进程的PID异步%28乐趣%29async((() -> any)) :: t开始一项必须等待的任务。此函数生成一个进程,该进程链接到调用进程并由其监视。阿Task返回包含相关信息的struct。阅读Task模块文档,以获取有关async/1async/3...另见async/3...异步%28 mod,乐趣,args%29async(module, atom, [term]) :: t开始一项必须等待的任务。阿Task返回包含相关信息的struct。开发人员必须最终调用Task.await/2Task.yield/2紧随其后Task.shutdown/2返回的任务。阅读Task模块文档,以获取有关async/1async/3...链接此函数生成一个进程,该进程链接到调用进程并由其监视。链接部分很重要,因为如果父进程死亡,它将中止任务。它还保证了在添加异步调用之后异步/等待具有相同属性之前的代码。例如,假设您拥有以下内容:x = heavy_fun() y = some_fun() x + y现在你想让heavy_fun()异步:x = Task.async(&heavy_fun/0) y = some_fun() Task.await(x) + y和以前一样,如果heavy_fun/0如果失败,整个计算将失败,包括父进程。如果不希望任务失败,则必须更改heavy_fun/0如果没有异步调用,代码就会以同样的方式实现。例如,返回{:ok, val} | :error结果,或者,在更极端的情况下,使用try/rescue换言之,异步任务应该被看作是进程的扩展,而不是将其与所有错误隔离的机制。如果不想将调用者链接到任务,则必须使用有监督的任务Task.Supervisor打电话Task.Supervisor.async_nolink/2...在任何情况下,避免下列任何一项:
  • 设置:trap_exittrue-只有在特殊情况下才能使用捕获出口,因为它将使您的进程对任务的退出和任何其他进程的退出都免疫。

而且,即使当陷阱退出时,await如果任务已结束而未将其结果发回,则仍将退出。

  • 开始的任务进程断开链接。async/await如果您解除了进程的链接,并且该任务不属于任何主管,您可以在父进程死亡的情况下离开悬空任务。消息格式任务发送的答复将采用以下格式{ref, result},在哪里ref是由任务结构和result任务函数的返回值。异步[医]流%28可枚举、有趣、选项[医][]%29async_stream(Enumerable.t, (term -> term), keyword) :: Enumerable.t返回运行给定函数的流。fun中的每个项目并发enumerable...各enumerable项作为参数传递给给定的函数。fun并由自己的任务处理。任务将链接到当前流程,类似于async/1...例异步计算每个字符串中的代码点,然后使用Reduce将计数相加。iex> strings = ["long string", "longer string", "there are many of these"] iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints |> Enum.count end) iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end) 47见async_stream/5用于讨论、选项和更多示例。异步[医]流%28可枚举,模块,函数,args,选项[医][]%29async_stream(Enumerable.t, module, atom, [term], keyword) :: Enumerable.t返回运行给定module,,,function,和args中的每个项目并发enumerable...每个项目都将优先于给定的args并由自己的任务处理。这些任务将链接到一个中间流程,然后再链接到当前流程。这意味着任务中的失败将终止当前进程,而当前进程中的失败将终止所有任务。当流化时,每个任务将发出{:ok, value}在成功完成后,或{:exit, reason}如果呼叫者正在陷阱出口。结果以与原始结果相同的顺序发出。enumerable...并发级别可以通过:max_concurrency选项和默认值为System.schedulers_online/0也可以将超时作为表示在没有任务答复的情况下等待的最大时间的选项。最后,考虑使用Task.Supervisor.async_stream/6在主管下开始任务。如果您发现自己捕获出口以处理异步流中的出口,请考虑使用Task.Supervisor.async_stream_nolink/6若要启动未链接到当前进程的任务,请执行以下操作。备选方案
  • :max_concurrency-设置同时运行的最大任务数。默认为System.schedulers_online/0...
  • :ordered-是否应按照输入流的相同顺序返回结果。当您有较大的流并且不希望在结果交付之前缓冲时,此选项非常有用。默认为true...
  • :timeout-允许每个任务执行的最大时间为%28毫秒%29毫秒。默认为5000...
  • :on_timeout-当任务超时时,该怎么办?可能的价值是:
-  `:exit` (default) - the process that spawned the tasks exits. 
-  `:kill_task` - the task that timed out is killed. The value emitted for that task is `{:exit, :timeout}`. 

让我们构建一个流,然后枚举它:

stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)

属性可以增加或减少并发性。:max_concurrency选择。例如,如果任务很重,则可以增加值:

max_concurrency = System.schedulers_online * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)

await(task, timeout \ 5000)

await(t, timeout) :: term | no_return

等待任务答复并返回它。

超时(以毫秒为单位)可以给出默认值为5000如果任务进程终止,此函数将以与任务相同的原因退出。

如果超过了超时,await将退出;但是,任务将继续运行。当调用进程退出时,它的退出信号将终止任务,如果它没有捕获退出。

此函数假定任务的监视器仍处于活动状态或监视器的:DOWN消息在消息队列中。如果它已经被妖魔化,或者消息已经收到,则此函数将等待等待消息的超时时间。

对于任何给定的任务,只能调用一次此函数。如果希望能够多次检查长期运行的任务是否已完成其计算,请使用yield/2相反。

与OTP行为的兼容性

不建议awaitOTP行为中长期运行的任务,如GenServer相反,您应该匹配来自您的任务的消息。GenServer.handle_info/2回调。

实例

iex> task = Task.async(fn -> 1 + 1 end)
iex> Task.await(task)
2

shutdown(task, shutdown \ 5000)

shutdown(t, timeout | :brutal_kill) ::
  {:ok, term} |
  {:exit, term} |
  nil

取消链接并关闭任务,然后检查是否有答复。

回报{:ok, reply}如果在关闭任务时收到答复,{:exit, reason}如果任务终止,则为nil...

关闭方法是超时或:brutal_kill。在a的情况下timeout:shutdown退出信号被发送到任务进程,并且如果在超时时间内没有退出,它将被终止。随着:brutal_kill任务马上被杀死。如果任务异常终止(可能被另一个进程终止),则该函数将以相同的原因退出。

终止调用者时不需要调用此函数,除非出于原因退出。:normal或者如果任务是陷阱退出。如果调用者出于其他原因退出,则为:normal任务没有捕获出口,调用者的退出信号将停止该任务。呼叫者可以有理由退出。:shutdown若要关闭其所有链接进程(包括任务),这些进程不会在不生成任何日志消息的情况下捕获退出。

如果任务的监视器已被妖魔化或接收,并且消息队列中没有等待响应,此函数将返回{:exit, :noproc}由于结果或退出原因无法确定。

start(fun)

start((() -> any)) :: {:ok, pid}

开始一项任务。

这仅在任务用于副作用时使用(即对返回结果不感兴趣),并且不应链接到当前进程。

start(mod, fun, args)

start(module, atom, [term]) :: {:ok, pid}

开始一项任务。

这仅在任务用于副作用时使用(即对返回结果不感兴趣),并且不应链接到当前进程。

start_link(fun)

start_link((() -> any)) :: {:ok, pid}

启动链接到当前进程的进程。

这通常用于作为监督树的一部分启动流程。

start_link(mod, fun, args)

start_link(module, atom, [term]) :: {:ok, pid}

作为监督树的一部分启动任务。

yield(task, timeout \ 5000)

yield(t, timeout) :: {:ok, term} | {:exit, term} | nil

暂时阻止当前进程等待任务答复。

返回{:ok, reply}是否收到答复,nil如果没有答复到达,或者{:exit, reason}任务已经退出。请记住,通常任务失败也会导致拥有该任务的进程退出。因此这个函数只能在返回{:exit, reason}时才返回

  • 任务进程退出的原因:normal
  • 它没有链接到呼叫者
  • 呼叫者正在捕获出口。

超时(以毫秒为单位)可以给出默认值为5000如果在收到来自任务的消息之前时间已满,则此函数将返回。nil监视器将保持活动状态。因此yield/2可以在同一任务上多次调用。

此函数假定任务的监视器仍处于活动状态或监视器的:DOWN消息在消息队列中。如果它已被妖魔化或消息已经收到,则此函数将等待超时时间等待消息。

如果您打算关闭任务,如果任务内部没有响应timeout毫秒,你应该把它和shutdown/1,就像这样:

case Task.yield(task, timeout) || Task.shutdown(task) do
  {:ok, result} ->
    result
  nil ->
    Logger.warn "Failed to get a result in #{timeout}ms"
    nil
end

如果任务在timeout但在此之前shutdown/1已被调用,您仍将得到结果,因为shutdown/1用于处理这种情况并返回结果。

yield_many(tasks, timeout \ 5000)

yield_many([t], timeout) :: [{t, {:ok, term} | {:exit, term} | nil}]

在给定的时间间隔内产生多个任务。

此函数接收任务列表,并在给定的时间间隔内等待它们的答复。它返回两个元素的元组列表,任务作为第一个元素,生成的结果作为第二个元素。

类似于yield/2,每个任务的结果将是

  • {:ok, term}如果任务已在给定时间间隔内成功地报告其结果
  • {:exit, reason}如果任务已经结束
  • nil如果任务一直运行超过超时时间

查帐yield/2想了解更多信息。

Task.yield_many/2允许开发人员生成多个任务,并在给定的时间范围内检索收到的结果。如果我们把它和Task.shutdown/2,它使我们能够收集这些结果,并取消尚未及时答复的任务。

让我们看看一个例子。

tasks =
  for i <- 1..10 do
    Task.async(fn ->
      Process.sleep(i * 1000)
      i
    end)
  end

tasks_with_results = Task.yield_many(tasks, 5000)

results = Enum.map(tasks_with_results, fn {task, res} ->
  # Shutdown the tasks that did not reply nor exit
  res || Task.shutdown(task, :brutal_kill)
end)

# Here we are matching only on {:ok, value} and
# ignoring {:exit, _} (crashed tasks) and `nil` (no replies)
for {:ok, value} <- results do
  IO.inspect value
end

在上面的例子中,我们创建了睡眠时间为1到10秒的任务,并返回了他们睡眠的秒数。如果您同时执行代码,您应该会看到1到5打印出来,因为这些任务在给定的时间内都有答复。所有其他任务都将使用Task.shutdown/2打电话。

扫码关注云+社区

领取腾讯云代金券