前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >c#异步编程-Task(二)

c#异步编程-Task(二)

作者头像
JusterZhu
发布2022-12-07 18:23:12
2.5K0
发布2022-12-07 18:23:12
举报
文章被收录于专栏:JusterZhuJusterZhu

一、概要

大家好,本次继续分享自己的学习经历。本文主要分享Task异步编程内容,如果能帮助大家希望多多关注文章末尾的微信公众号和知乎三连。各位举手之劳是对我更新技术文章最大的支持。

二、详细内容

1.同步异步

  • 同步操作会在返回调用者之前完成它的工作
  • 异步操作会在返回调用者之后去做它的工作
    • 异步的方法更为少见,会启用并发,因为他的工作会与调用者并行执行
  • 目前见到的大部分的异步方法都是通用目的的:
    • Thread.Start
    • Task.Run
    • 可以将continuation附加到Task的方法

什么是异步编程

  • 异步编程的原则是将长时间运行的函数写成异步的。
  • 传统做法是将长时间运行的函数写成同步的,然后从新的线程或Task进行调用从而按需引入并发。
  • 上述异步方式的不同之处在于,它是长时间运行函数的内部启动并发。有这两点好处
    • IO-bound并发可不适用线程来实现。可提供可扩展性和执行效率;
    • 富客户端在worker线程会使用更少的代码,简化了线程安全性。

异步编程的两种用途

调用图(call graph)

  • 编写高效处理大量并发IO的应用程序(典型的:服务器端应用)
    • 挑战并不是线程安全(因为共享状态通常是最小化的),而是执行效率
    • 特别的,每个网络请求并不会消耗一个线程。
  • 调用图
  • 在富客户端应用里简化线程安全。
    • 如果调用图中任何一个操作时长时间运行的,那么整个call graph必须运行在worker线程上,以保证UI响应。
    • 得到一个横跨多个方法的单一并发操作;
    • 需要为call graph中的每个方法考虑线程安全。
    • 异步的call graph,只要需要才开启一个线程,通常较浅(IO-bound操作完全不需要)
    • 其他的方法可以在UI线程执行,线程安全简化。
    • 并发的粒度适中:-一连串小的并发操作,操作之间会弹回到UI线程

经验之谈

为了获得上述好处,下列操作建议异步编写:

  • IO-bound和compute-bound操作
  • 执行超过50毫秒的操作
  • 另一方面过细的粒度会损害性能,因为异步操作也有开销。

注:

  • IO-bound(I/O密集型)表示:指的是系统的CPU效能相对硬盘/内存的效能要好很多,此时,系统运作,大部分的状况是 CPU 在等 I/O (硬盘/内存) 的读/写,此时 CPU Loading 不高。
  • Compute-bound(计算密集型)表示: 指的是系统的 硬盘/内存 效能 相对 CPU 的效能 要好很多,此时,系统运作,大部分的状况是 CPU Loading 100%,CPU 要读/写 I/O (硬盘/内存),I/O在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU Loading 很高。在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

2.异步和Coninuation以及语言的支持

  • Task非常适合异步编程,因为他们支持Continuation(它对异步非常重要)
  • TaskCompletionSource是实现底层IO-bound异步方法的一种标准方式
  • 对于Compute-bound方法,Task.Run会初始化绑定线程的并发。
  • 把task返回调用者,创建异步方法;
  • 异步编程的区别:目标是在调用图较低的位置来这样做。
  • 富客户端应用中,高级方法可以保留在UI线程和访问控制以及共享状态上,不会出现线程安全问题

代码例子:

代码语言:javascript
复制
    //例子1,同步方法进行Compute-bound操作
    static void Main(string[] args)
    {
        DisplayCounts();
        //粗粒度异步调用
        //Task.Run(()=>{ DisplayCounts(); });
        Console.ReadKey();
    }

    static void DisplayCounts() 
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(GetPrimesCount(i*1000000 + 2,1000000) + "between" + (i * 1000000) + "and" + ((i + 1) * 1000000 - 1));
        }
        Console.WriteLine("Done!");
    }

    static int GetPrimesCount(int start , int count) 
    {
        return ParallelEnumerable.Range(start, count).Count(n=> Enumerable.Range(2,(int)Math.Sqrt(n)-1).All(i=>n%i>0));
    }
代码语言:javascript
复制
    //例子2,异步方法执行Compute-bound操作
    static void Main(string[] args)
    {
        //细粒度异步调用
        DisplayCounts();
        Console.ReadKey();
    }

    static void DisplayCounts() 
    {
        for (int i = 0; i < 10; i++)
        {
            var awaiter = GetPrimesCount(i * 1000000 + 2, 1000000).GetAwaiter();
            awaiter.OnCompleted(()=> 
            Console.WriteLine(awaiter.GetResult())
            );
        }
        Console.WriteLine("Done!");
    }

    static Task<int> GetPrimesCount(int start , int count) 
    {
        return Task.Run(()=> ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
    }

在例子1中同步执行是有序输出,例子2中的执行输出顺序是乱的且Done是最先输出出来的,这个结果并不是我们想要的接下来需要进行一些优化。

语言对异步的支持非常重要

  • 需要对task的执行序列化
  • 例如Task B依赖于 Task A的执行结果。
    • (例子)为此,必须在continuation内部触发下一次循环

代码示例:

代码语言:javascript
复制
    //例子3,有序异步执行
    static void Main(string[] args)
    {
        //这里的调用非完全异步
        DisplayCounts();
        Console.ReadKey();
    }

    static void DisplayCounts()
    {
        DisplayCountsFrom(0);
    }

    static void DisplayCountsFrom(int i) 
    {
        var awaiter = GetPrimesCount(i * 1000000 + 2, 1000000).GetAwaiter();
        awaiter.OnCompleted(() => 
        { 
            Console.WriteLine(awaiter.GetResult());
            if (++i < 10)
            {
                DisplayCountsFrom(i);
            }
            else
                Console.WriteLine("Done!");
        });
    }

    static Task<int> GetPrimesCount(int start, int count)
    {
        return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
    }


    //例子4,完全异步执行
    static void Main(string[] args)
    {
        DisplayPrimeCountsAsync();
        Console.ReadKey();
    }

    public static Task DisplayPrimeCountsAsync() 
    {
        var machine = new PrimesStateMachine();
        machine.DisplayCountsFrom(0);
        return machine.Task;
    }

    public static void DisplayCountsFrom(int i) 
    {
        var awaiter = GetPrimesCount(i * 1000000 + 2, 1000000).GetAwaiter();
        awaiter.OnCompleted(() => 
        { 
            Console.WriteLine(awaiter.GetResult());
            if (++i < 10)
            {
                DisplayCountsFrom(i);
            }
            else
                Console.WriteLine("Done!");
        });
    }

    public static Task<int> GetPrimesCount(int start, int count)
    {
        return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
    }
}

class PrimesStateMachine 
{
    TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();

    public Task Task { get { return _tcs.Task; } }

    public void DisplayCountsFrom(int i)
    {
        var awaiter = Program.GetPrimesCount(i * 1000000 + 2, 1000000).GetAwaiter();
        awaiter.OnCompleted(() =>
        {
            Console.WriteLine(awaiter.GetResult());
            if (++i < 10)
            {
                DisplayCountsFrom(i);
            }
            else
            {
                _tcs.SetResult(null);
                Console.WriteLine("Done!");
            }
        });
    }
}

以上的写法,依旧过于繁琐接下来通过异步关键字来进行下一步优化减少代码量。

代码语言:javascript
复制
    //例子5
    static async Task Main(string[] args)
    {
        await DisplayPrimeCountsAsync();
        Console.ReadKey();
    }

    public async static Task DisplayPrimeCountsAsync()
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(await GetPrimesCountAsync(i * 1000000 + 2,1000000) + "");
        }
        Console.WriteLine("Done");
    }

    public static Task<int> GetPrimesCountAsync(int start, int count)
    {
        return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
    }
  • async和await
    • 对于不想复杂的实现异步非常重要
  • 命令式循环结构不要和continuation混合在一起,因为它们依赖于当前本地状态。
  • 另一种实现,函数式写法(Linq查询),它也是响应式编程(Rx)的基础。ps:RX框架是Reactive Framework (Rx) ,它是一个异步通知的框架,有点类似观察者模式,只不过它是异步的,不会因为监听请求而阻塞通道

3.await async

async和await关键字可以让你写出和同步代码一样简介且结构相同的异步代码

await关键字简化了附加continuation(继续体)的过程。

结构如下:

代码语言:javascript
复制
var result = await expression;
statement(s);

它的作用相当于:

代码语言:javascript
复制
var awaiter = expression.GetAwaiter();
awaiter.OnCompleted(()=>{
    var result = await expression;
    statement(s);
})

async修饰符会让编译器把await当做关键字而不是修饰符(c#5以前可能会使用await作为标识符)

async 修饰符只能应用于方法(包括lambad表达式)。

  • 该方法可以返回void、Task、Task

async 修饰符对方法的签名或public元数据没有影响(和unsafe一样),它只会影响方法内部。

  • 在几口内使用async是没有意义的
  • 使用async来重载飞async的方法却是合法的(只要方法签名一致)

使用了async修饰符的方法就是“异步函数”。

异步方法如何执行

  • 遇到await表达式,执行(正常情况下)会返回调用者
    • 为保证task结束时,实现会跳回原方法,从停止的地方继续执行。
    • 就像iterator里面的yield return。
    • 在返回前,运行时会附加一个continuation到await的task
  • 如果发生故障,那么异常会被重新抛出
  • 如果一切正常,那么它的返回值就会赋给await表达式

可以await哪些?

  • await的表达式通常是一个task
  • 也可以满足下列条件的任意对象:
    • 有GetAwaiter方法,它返回一个awaiter(实现了INotifyCompletion.OnCompleted接口)
    • 返回适当类型的GetResult方法
    • 一个bool类型的IsCompleted属性

捕获本地状态

  • await表达式的最牛之处就是它几乎可以出现在任何地方。
  • 特别的,在异步方法内,await表达式可以替换任何表达式。
    • 除了lock表达式和unsafe上下文

await之后在哪个线程上执行

  • 在await表达式之后,编译器依赖于continuation(通过awaiter模式)来继续执行。
  • 如果在富客户端应用的UI线程上,同步上下文会保证后续是在源线程上执行;
  • 否则,就会在task结束的线程上继续执行。

UI上的await

代码语言:javascript
复制
//WPF示例非异步代码1
<Grid>
    <Grid.RowDefinitions>
        <RowDefinition  Height="25"></RowDefinition>
        <RowDefinition></RowDefinition>
    </Grid.RowDefinitions>
    <Button Content="ok" VerticalAlignment="Top" Click="Button_Click_1"></Button>
    <DockPanel x:Name="myPanel" Grid.Row="1">

    </DockPanel>
</Grid>


public partial class MainWindow : Window
{
    TextBlock textBlock;

    public MainWindow()
    {
        InitializeComponent();
        textBlock = new TextBlock();
        myPanel.Children.Add(textBlock);
    }

    void Go() 
    {
        for (int i = 1; i < 5; i++)
        {
            textBlock.Text += GetPrimesCount(i * 1000000 + 2, 1000000) + "" + Environment.NewLine; 
        }
    }

    int GetPrimesCount(int start, int count) 
    {
        return ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0));
    }

    private void Button_Click_1(object sender, RoutedEventArgs e)
    {
        Go();
    }
}

//WPF示例异步代码2
//将示例1中的GetPrimesCount方法修改为异步则不会阻塞UI
public static Task<int> GetPrimesCountAsync(int start, int count)
{
    return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
}
  • 本示例中,只有GetPrimesCountAsync中的代码在worker线程上运行
  • Go中的代码会“租用”UI线程上的时间
  • 可以说:Go是在消息循环中“伪并发”的执行
    • 这其实简化了线程安全,防止重新进入即可
    • 也就是说:它和UI线程处理的其他时间是穿插执行的
    • 因为这种伪并发,唯一能发生“抢占” 的时刻就是在await期间
  • 这种并发发生在调用栈较浅的地方(Task.Run调用的代码里)
  • 为了从该模型获益,真正的并发代码要避免访问共享状态或UI控件。

伪代码:

代码语言:javascript
复制
    为本线程设置同步上下文(基于WPF)
    while(!线程结束)
    {
        等着消息队列中发生一些事情
        如果发生了事情,看看是哪种消息?
        如果是键盘/鼠标消息->触发 event handeler
        如果是用户BeginInvoke/Invoke 消息->执行委托
    }
  • 附加到UI元素的Event handler 通过消息循环执行
  • 因为在UI线程上await,continuation将发送到同步上下文上,该同步上下文通过消息循环执行,来保证整个Go方法伪并发在UI线程上执行。

与粗粒度的并发相比

代码语言:javascript
复制
public partial class MainWindow : Window
{
    TextBlock textBlock;

    public MainWindow()
    {
        InitializeComponent();
        textBlock = new TextBlock();
        myPanel.Children.Add(textBlock);
    }

    void Go() 
    {
        for (int i = 1; i < 5; i++)
        {
            textBlock.Text += GetPrimesCount(i * 1000000 + 2, 1000000) + "" + Environment.NewLine; 
        }
    }

    int GetPrimesCount(int start, int count) 
    {
        return ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0));
    }

    private void Button_Click_1(object sender, RoutedEventArgs e)
    {
        //这里的Task是粗粒度调用,将go这种同步方法统一都放到了worker线程中执行。语法看起来好像并没有任何坏处其实会引用race condition
        Task.Run(()=> Go());
    }

    public static Task<int> GetPrimesCountAsync(int start, int count)
    {
        return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
    }
}
  • 例如使用BackgroundWorker(即是粗粒度并发。例子,Task.Run)
  • 整个同步调用图都在worker线程上
  • 必须在代码中到处使用Dispatcher.BeginInvoke
  • 循环本身在worker线程上
  • 引入了race condition(线程竞争条件)
  • 若实现取消或过程报告,会导致线程安全问题更容易发生,在方法中添加任何的代码也是同样的效果

4.编写异步函数

  • 对于任何异步函数,你可以使用Task替代void作为返回类型,让该方法成为更有效的异步(可以进行await)。

示例代码:

代码语言:javascript
复制
    public void Go(){}
    public async Task Go(){}
  • 并不需要在方法体中显式的返回Task。编译器会生成一个Task(当方法完成或发生异常时),这使得创建异步的调用链非常方便。

示例代码:

代码语言:javascript
复制
    public async Task Go(){ //这里不要return,但必须有await的方法在Go的函数体内 }
  • 编译器会对返回Task的异步函数进行扩展,使其成为发送信号或发生故障时使用TaskCompletionSource来创建Task的代码。

示例代码:

代码语言:javascript
复制
    //编译器层将会处理以下代码实现
    Task Do() 
    {
        var tcs = new TaskCompletionSource<object>();
        var awaiter = Task.Delay(5000).GetAwaiter();
        awaiter.OnCompleted(()=> 
        {
            try
            {
                awaiter.GetResult();
                int answer = 21 * 2;
                tcs.SetResult(null);
                Console.WriteLine(answer);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        return tcs.Task;
    }
  • 因此,当返回Task的异步方法结束时,执行就会跳回到对他进行await的地方。(这个过程通过continuation来实现)

编写异步函数-富客户端场景下

  • 富客户端场景下,执行在此刻会跳回到UI线程(如果目前不在UI线程的话)
  • 否则,就在continuation返回的任意线程上继续执行。
  • 这意味着,在异步滴啊哦哟图中向上冒泡的时候,不会发生延迟成本,除非是UI线程启动的第一次“反弹”。

非void返回类型的方法,返回Task

如果方法体返回TResult,那么异步方法就可以返回Task。

代码语言:javascript
复制
async Task<int> Get()
{
    await Task.Delay(5000);
    int anwser = 21 * 2;
    return anwser;
}

其原理就是给TaskCompletionSource发送的信号带有值,而不是null

代码语言:javascript
复制
async Task Do()
{
    //调用时加上await返回的值则是int型
    int anwser = await Get();
    //调用时不加上await返回的值则是Task<int>型
    Task<int> anwser = Get();
    Console.WriteLine(anwser);
}

async Task<int> Get()
{
    await Task.Delay(5000);
    int anwser = 21 * 2;
    return anwser;
}

与同步编程很相似,微软官方就是这么设计的。

c#中如何设计异步函数

  • 以同步的方式编写方法
  • 使用异步调用来代替同步调用,并且进行await
  • 除了顶层方法外(UI控件的Event handler),把你方法的返回类型升级为(返回void的类型升级为)Task或(非void的类型升级为)Task ,这样他们就可以进行await了。

编译器能对异步函数生成Task意味着什么?

  • 大多数情况下,你只需要在初始化IO-bound并发的底层方法里显式的初始化TaskCompletionSource,这种情况很少见。
  • 针对初始化Compute-bound的并发方法,你可以使用Task.Run来创建Task。

异步调用图的执行

整个执行与之前的同步例子中调用图执行的顺序一样,因为我们对每个异步函数的调用都进行了await。

在调用图中创建了一个没有并行和重叠的连续流。

每个await在执行中都创建了一个间隙,在间隙后,程序可以从中断处恢复执行。

代码语言:javascript
复制
async Task Main()
{
   //这一整个task调用链都是在主线程同步执行
   //对所有的异步方法进行await,达到对所有方法同步调用的效果
   await Go();//main thread
}

async Task Go() 
{
    var task = Doit();
    await task;
    Console.WriteLine("done");
}

async Task Doit() 
{
    var task = GetAnswer();
    int answer = await task;
    Console.WriteLine(answer);
}

async Task<int> GetAnswer() 
{
    var task = Task.Delay(5000);
    await task;
    int answer = 21 * 2;
    return answer;
}

并行(parallelism)

不使用await来调用异步函数会导致并行执行的发生。

例如:_button.Click +=(sender,args)=>Go();

  • 确实也能满足保持UI响应的并发要求

同样,可以并行跑两个操作:

代码语言:javascript
复制
var task1 = GetAnswer();
var task2 = GetAnswer();
await task1;
await task2;

异步Lambad表达式

匿名方法(包括Lambda表达式),通过使用async也可以编程异步方法。

调用方式也一样。

代码语言:javascript
复制
async Task Main() 
{
    Func<Task> unnamed = async () => 
    {
        await Task.Delay(1000);
        Console.WriteLine("FOO");
    };
    await unnamed();
    await NamedMethod();
}

async Task NamedMethod() 
{
    await Task.Delay(1000);
    Console.WriteLine("Foo");
}

附加event handler的时候也可以使用异步Lambda表达式

异步的Lambda表达式也可以返回Task。

代码语言:javascript
复制
//wpf中按钮的触发的时间
private async void Button_Click_1(object sender, RoutedEventArgs e)
{
    await Task.Run(()=> Go());
}

异步中的同步上线文

发布异常

富客户端应用通常依赖于几种的异常处理时间来处理UI线程上未捕获的异常。

  • 例如wpf中的Application.DispatcherUnhandledException
  • Asp.Net Core中的定制ExceptionFilterAttribute也是差不多的效果

其中内部原理就是:通过他们在自己的Try/Catch块来调用UI时间(在ASP.NET Core里就是页面处理的方法管道)

顶层的异步方法会使事情更加复杂,在这里Button_Click()是顶层方法因为没有再await它的地方了,所以它的返回类型是void就可以了。但当该方法被触发时下面声明的Exception则不会被发现。

代码语言:javascript
复制
private async void Button_Click_1(object sender, RoutedEventArgs e)
{
    await Task.Run(()=> Go());
    throw new Exception("will this be ignored?");
}

当点击按钮,event handler运行时,在await后,执行会正常的返回到消息循环1秒钟之后抛出的异常无法被消息循环中的catch块捕获。

为了缓解该问题,AsyncVoidMethodBuilder会捕获未处理的异常(在返回void的异步方法里),并把它们发布到同步上下文(如果出现的话),以确保全局异常处理时间能够触发。

注意

编译器只会把上述逻辑应用于返回类型为void的异步方法。

如果ButtonClick的返回类型是Task,那么未处理的异常将导致结果Task出错,然后Task无处可去(导致未观察到的异常出现)

一个有趣的细微差别:无论你在await前面还是后面抛出异常,都没有区别。

因此,下例中,异常会被发布到同步上下文(如果出现的话),而不会发布给调用者。

  • async void Foo(){ throw null; await Task.Delay(1000); }
  • 如果同步上下文没有出现,异常将会在线程池上传播,从而终止应用程序。

不直接将异常抛出回调用者的原因是为了确保可预测性和一致性。

在下例中,不管SomeCondition是什么值,InvalidOperationException将始终得到和导致Task出错同样的效果

代码语言:javascript
复制
async Task Foo()
{
 if (someCondition) await Task.Delay(100);
    throw new InvalidOperationException();
}

iterator 也是一样的:IEnumerable Foo(){ throw null; yield return 123; }

  • 本例中,异常绝不会直接返回给调用者,直到序列被遍历后,才会抛出异常。

OperationStarted 和 OperationCompleted

  • 如果存在同步上下文,返回void的异步函数也会在进入函数式调用其perationStarted方法,在函数完成时调用其OperationCompleted方法
  • 如果是为了返回void的异步方法进行单元测试而编写一个自定义的同步上下文,那么重写这两个方法确实很有用。

5.优化同步完成

异步函数可以在await之前就返回。例子

代码语言:javascript
复制
static async Task Main(string[] args)
{
    Console.WriteLine(await GetWebPageAsync("http://baidu.com"));
}

static Dictionary<string, string> _cache = new Dictionary<string, string>();

static async Task<string> GetWebPageAsync(string uri) 
{
    string html;
    if (_cache.TryGetValue(uri, out html))
    {
        return html;
    }
    return _cache[uri] = await new WebClient().DownloadStringTaskAsync(uri);
}

如果URI在缓存中存在,那么不会有await发生,执行就会返回给调用者,方法会返回一个已经设置信号的Task,这就是同步完成。

当await同步完成的Task时,执行不会返回到调用者,也不同通过Continuation跳回。它会;立即执行到下个语句。

编译器是通过检查awaiter上的IsCompleted属性来实现这个优化的。也就是说无论何时,当你await的时候:

  • Console.WriteLine(await GetWebPageAsync("http://baidu.com"));

如果是同步完成,编译器会释放可短路Continuation的代码,

代码语言:javascript
复制
    var awaiter = GetWebPageAsync().GetAwaiter();
    if (awaiter.IsCompleted)
    {
        Console.WriteLine(awaiter.GetResult());
    }
    else
    {
        awaiter.OnCompleted(() => Console.WriteLine(awaiter.GetResult()));
    }

注意

对一个同步返回的异步方法进行await,任然会引起一个小的开销(20纳秒左右)

反过来,跳回线程池,会引入上下文切换开销,可能是1-2毫秒

而跳回到UI的消息循环,至少是10倍开销(如果UI繁忙,那时间更长)

编写完全没有await的异步方法也是合法的,但是编译器会发出警告

但这类方法可以用于重载virtual/abstract方法

另外一种可以达到相同结果的方式是:使用Task.FromResult,它会返回一个已经设置好信号的Task。

代码语言:javascript
复制
Task<string> Foo(){ return Task.FromResult("bbb"); }

如果是从UI线程上调用,那么GetWebPageAsync方法是隐式线程安全的。可以连续调用它(从而启动多个并发下载),并且不需要lock来保护缓存。

有一种简单的方法可以实现这一点,而不必求助于lock或信令结构。我们创建一个“futures”(Task)的缓存,而不是字符串的缓存。注意并没有async:

代码语言:javascript
复制
static Dictionary<string, string> _cache2 = new Dictionary<string, string>();

static async Task<string> GetWebPageAsync2(string uri)
{
    if (_cache.TryGetValue(uri, out var downloadTask))
    {
        return downloadTask;
    }
    return _cache[uri] = await new WebClient().DownloadStringTaskAsync(uri);
}

lock的不是下载过程,lock的是检查缓存的过程(很短暂),这个过程不影响并发

代码语言:javascript
复制
    lock (_cache2)
    {
        if (_cache2.TryGetValue(uri,out var downloadTask))
        {
            return downloadTask;
        }
        else
        {
            return _cache2[uri] = new WebClient().DownloadStringTaskAsync(uri);
        }
    }

ValueTask

  • ValueTask 用于为优化场景,您可能永远不需要编写返回此类型的方法。
  • Task 和Task是引用类型,实例化它们需要基于堆的内存分配和后续的收集
  • 优化的一种极端形式是编写无需分配此类内存的代码;换句话说,这不会实例化任何引用类型,不会给垃圾收集增加负担。
  • 为了支持这种模式,c#引入了ValueTask和ValueTask 这两个struct,编译器允许使用他们替代Task和Task
    • async ValueTask Foo(){…}
  • 如果是同步完成,则await ValueTask 是无分配的。
    • int answer = await Foo();//可能是无分配的
  • 如果操作不是同步完成的,ValueTask 实际上就会创建一个普通的Task(并将await转发给它)
  • 使用AsTask方法,可以把ValueTask 转化为Task(也包括非泛型版本)

使用ValueTask注意事项

  • ValueTask 并不常见,它的出现纯粹是为了性能。
  • 这意味着她被不恰当的值类型语义所困扰,这可能会导致意外。为避免错误行为,必须避免以下情况。
    • 多次await同一个ValueTask
    • 操作没结束的时候就调用GetAwaiter().GetResult()
  • 如果你需要进行这些操作,那么先调用AsTask方法,操作它返回的Task。
  • 为了避免上述现金最简单的办法就是直接await方法调用:
    • await Foo();
  • 将ValueTask赋给变量时,可能引发错误了:
    • ValueTask valueTask = Foo();
  • 将其立即转化为普通的Task,就可以避免此类错误的发生:
    • Task valueTask = Foo().AsTask();

避免过度的弹回

对于在循环中多次调用的方法,通过调用ConfigureAwait方法,就可以避免重复的弹回到UI消息循环所带来的的开销。

这强迫Task不把continuation弹回给同步上下文。从而将开销削减到接近上下文切换的成本(如果您await的方法同步完成,则开销会小得多):

代码语言:javascript
复制
async void A() { await B(); };

async Task B() 
{
    for (int i = 0; i < 1000; i++)
    {
        await C().ConfigureAwait(false);
    }
}

async Task C() { ... }

这意味着对于方法B和C,我们取消了UI线程中简单线程安全模型,即代码在UI线程上运行,并且只能在await语句期间被抢占。但是,方法A不收影响,如果在一个UI线程上启动,它将保留在UI线程上。

这种优化在编写库时特别重要:您不需要简化线程安全性带来的好处,因为您的代码通常不与调用方共享状态,也不访问UI控件。

6.取消 cancellation

使用取消标志来实现对并发进行取消,可以封装一个类:

代码语言:javascript
复制
//语法定义
class CancellationToken
{public void IsCancellationRequested { get; private set; }

public void Cancel(){ IsCancellationRequested = true; }

public void ThrowIfCancellationRequested()
{ 
    if(IsCancellationRequested)
         throw new OperationCanceledExcption();
}

}

//调用代码
async Task Foo(CancellationToken cancellationToken)
{
    for(int i = 0; i < 10; i++)
    {
        Console.WriteLine(i);
        await Task.Delay(1000);
        cancellationToken.ThrowIfCancellationRequested();
    }
}

当调用者想取消的时候,它调用CancellationToken上的Cancel方法。这就会把IsCancellationRequested设置为true,即会导致短时间后Foo会通过OperationCanceledException引发错误。

CancellationToken 和 CancellationTokenSource

  • 先不管线程安全(应该再读写IsCancellationRequested时进行lock),这个模式非常有效,CLR也提供了一个CancellationToken类,它的功能和前面的例子类似。
  • 但是他缺少一个Cancel方法,Cancel方法在另外一个类上进行暴露:
  • 这种分离的设计是出于安全考虑:只能对CancellationToken访问的方法可以检查取消,但是不能实例化取消。

获取CancellationToken

想获得取消标志(cancellation token),先实例化CancellationTokenSource:

代码语言:javascript
复制
var cancelSource = new CancellationTokenSource();

这会暴露一个token属性,它会返回一个cancellationtoken,所以我们可以这样调用:

代码语言:javascript
复制
var cancelSource = new CancellationTokenSource();
Task foo = Foo(cancelSource.Token);
...
...(some time later)
cancelSource.Cancel();

Delay

CLR里大部分的异步方法都支持CancellationToken,包括Delay方法。

代码语言:javascript
复制
async Task Foo(CancellationToken cancellationToken)
{
    for(int i = 0; i<10; i++)
    {
        Console.WriteLine(i);
        await Task.Delay(1000,cancellationToken);
    }
}

这时,task在遇到请求时会立即停止(而不是1秒钟之后才停止)

这里,我们无需调用ThrowIfCancellationRequested,因为Delay会替我们做。

  • 取消标记在调用栈中很好的向下传播(就像是因为异常,取消请求在调用栈中向上级联一样)。

同步方法

同步方法也支持取消(例如Task的Wait方法)。这种情况下,取消指令需要异步发出(例如,来自另一个Task)

代码语言:javascript
复制
var cancelSource = new CancellationTokenSource();
Task.Delay(5000).ContinueWith(ant=>cancelSource.Cancel());      
...

其它

事实上,您可以在构造CancellationTokenSource时指定一个时间间隔,以便在一段时间后启动取消。它对于实现超时非常有用,无论是同步还是异步:

代码语言:javascript
复制
var cancelSource = new CancellationTokenSource();
try{ await Foo(cancelSource.Token); }
catch(OperationcanceledException ex){ Console.WriteLine("Cancelled"); }

CancellationToken这个struct提供了一个Register方法,它可以让你注册一个回调委托,这个委托会在取消时触发。它会返回一个对象,这个对象在取消注册时可以被Dispose掉。

编译器的异步函数生成的Task在遇到未处理的OperationCanceledException异常时会自动进入取消状态(IsCanceled返回true,IsFaulted返回false)

使用Task.Run创建Task也是如此。这里是指向构造函数传递(相同的)CnacellationToken。

在异步场景中,故障Task和取消的Task之间的区别并不重要,因为它们在await时都会抛出一个OperationcanceledException。但这在高级并行编程场景(特别是条件continuation)中很重要。

7.TAP Task-based Asynchoronous Pattern

  • .net core暴露了数百个返回task且可以await的异步方法(主要和I/O相关)。大多数方法都遵循一个模式,叫做基于Task的异步模式(TAP)。这是我们迄今为止所描述的合理形式化。TAP方法执行以下操作:
    • 返回一个“热”(运行中的)Task或Task
    • 方法名以Async结尾(除了好像Task组合器等情况)
    • 会被重载,以便接受CancellationToken或(和)IProgress ,如果支持相关操作的话。
    • 快速返回调用者(只有很小的初始化同步阶段)
    • 如果是I/O绑定,那么无需绑定线程

8.Task组合器

  • 异步函数有一个让其保持一致的协议(可以一致的返回Task),这能让其保证良好的结果:可以使用以及编写Task祝贺器,也就是可以组合Task,但是并不关心Task具体做什么。
  • CLR提供了两个Task组合器
    • Task.WhenAny
    • Task.WhenAll

假设定义了方法如下:

代码语言:javascript
复制
    aync Task<int> Delay1() { await Task.Delay(1000); return 1;  }
    aync Task<int> Delay2() { await Task.Delay(1000); return 2;  }
    aync Task<int> Delay3() { await Task.Delay(1000); return 3;  }

WhenAny

当一组Task中任何一个Task完成时,Task.WhenAny会返回完成的Task。

代码语言:javascript
复制
Task<int> winningTask = await Task.WhenAny(Delay1(),Delay2(),Delay3());
Console.WirteLine("Done");
Console.WirteLine(winningTask.Result);

因为Task.WhenAny本身就返回一个Task,我们对他进行await,就会返回最先完成的Task。

上例完全是非阻塞的,包括最后一行(当访问result属性时,winningTask已完成),但最好还是对winningTask进行await,因为异常无需AggregateExceotion包装就会重新抛出:

代码语言:javascript
复制
Console.WirteLine(await winningTask);

实际上,我们可以在一步中执行两个await:

代码语言:javascript
复制
Task<int> winningTask = await await Task.WhenAny(Delay1(),Delay2(),Delay3());

如果“没赢”的Task后续发生了错误,那么异常将不会被观察到,除非你后续对它们进行await(或者查询其Exception属性)

WhenAny很适合为不支持超时或取消的操作添加这些功能:

代码语言:javascript
复制
Task<string> task = SomeAsyncFunc();
Task winner = await (Task.WhenAny(task,Task.Delay(5000)));
if(winner != task) throw new TimeoutException();
string reuslt = await task;//Unwrap result/re-throw

注意:本例子中返回的结果是Task类型。

WhenAny

当传给它的所有的Task都完成后,Task.WhenAll会返回一个Task。

代码语言:javascript
复制
await Task.WhenAll(Delay1(),Delay2(),Delay3());
  • 本例就会在3秒后结束。

通过轮流对3个task进行awiat,也可以得到类似的结果:

代码语言:javascript
复制
Task task1 = Delay1(), task2 = Delay2(), task3 = Delay3();
await task1;await task2;await task3;

不同点是(除了3个await的低效):如果task1出错,我们就无需等待task2和task3了,它的错误也不会被观察到。

WhenAny异常

与之相对,Task.WhenAll直到所有Task完成,它才会完成,及时有错误发生。如果有多个错误,他们在的异常会包裹在Task的AggregateException里

await组合的Task,只会抛出第一个异常,想要看到所有的异常,你需要这样做:

代码语言:javascript
复制
Task task1 = Task.Run(()=>{ throw null; });
Task task2 = Task.Run(()=>{ throw null; });
Task all = Task.WhenAll(task1,task2);
try{ await all; }
catch
{
    Console.writeLine(all.Exception.InnerExceptions.Count);
}

对一组Task调用WhenAll会返回Task,也就是所有Task的组合结果。

如果进行await,那么就会得到TResult[]:

代码语言:javascript
复制
Task<int> task1 = Task.Run(()=>1);
Task<int> task2 = Task.Run(()=>2);
int[] results = await Task.WhenAll(task1,task2);

实例

代码语言:javascript
复制
async Task<int> GetTotalSize(string[] uris)
{
    IEnumerable<Task<byte[]>> downloadTasks = uris.Select(uri=>new WebClient().DownloadDataTaskAsync(uri));
    byte[][] contents = await Task.WhenAll(downloadTasks);
    return contents.Sum(c=>c.Lenght);
}


//语法优化
async Task<int> GetTotalSize(string[] uris)
{
    IEnumerable<Task<int>> downloadTasks = uris.Select(async uri=>await new WebClient().DownloadDataTaskAsync(uri).Length);
    int[] contentLengths = await Task.WhenAll(downloadTasks);
    return contentLengths.Sum();
}

自定义task组合器

可以编写自定义的Task组合器。最简单的组合器接收一个task,看下例:

代码语言:javascript
复制
async static Task<TResult> WithTimeout<TResult>(this Task<TResult> task,TimeSpan timeout)
{
    Task winner = await Task.WhenAny(task,TaskDelay(timeout)).ConfigureAwait(false);
    if(winner != task) throw new TimeoutException();
    return await task.ConfigureAwait(false);
}

这就是为等待的task添加了超时功能

因为这可能是一个库方法,无需与外界共享状态,所以在await时我们使用了ConfigureAwait(false)来避免弹回到UI的同步上下文。

通过在Task完成时取消Task.Delay我们可以改进上例的效率(避免了计时器的小开销):

代码语言:javascript
复制
async static Task<TResult> WithTimeout<TResult>(this Task<TResult> task,TimeSpan timeout)
{
    var cancelSource = new CancellationTokenSource();
    var delay = Task.Delay(timeout,cancelSource.Token);
    Task winner = await Task.WhenAny(task,delay).ConfigureAwait(false);
    if(winner == task)
        cancelSource.Cancel();
    else
        throw new TimeoutException();
    return await task.ConfigureAwait(false);
}

自定义task组合器 通过cancellationToken 放弃task

代码语言:javascript
复制
    static Task<TResult> WithCancellation<TResult>(this Task<TResult> task, CancellationToken cancelToken) 
    {
        var tcs = new TaskCompletionSource<TResult>();
        var reg = cancelToken.Register(()=> tcs.TrySetCanceled());
        task.ContinueWith(ant => 
        {
            reg.Dispose();
            if (ant.IsCanceled)
                tcs.TrySetCanceled();
            else if (ant.IsFaulted)
                tcs.TrySetException(ant.Exception.InnerException);
            else
                tcs.TrySetResult(ant.Result);
        });
        return tcs.Task;
    }

接下来在看一个例子,这个组合器功能类似WhenAll,如果一个Task出错,那么其余的Task也立即出错:

代码语言:javascript
复制
async Task<TResult[]> WhenAllOrError<TResult>(params Task<TResult>[] tasks)
{
    var killJoy = new TaskCompletionSource<TResult[]>();
    foreach (var task in tasks)
    {
        task.ContinueWith(ant=> 
        {
            if (ant.IsCanceled)
                killJoy.TrySetCanceled();
            else if (ant.IsFaulted)
                killJoy.TrySetException(ant.Exception.InnerException);
        });
    }
    return await await Task.WhenAny(killJoy.Task,Task.WhenAll(tasks)).ConfigureAwait(false);
}

上述代码中,TaskCompletionSourced的任务就是当任意一个Task出错时,结束工作。所以我们没有调用SetResult方法,只调用了它的TrySetCanceled和TrySetException方法。这里ContinueWith要比GetAwaiter().OnCompleted更方便,因为我们不访问Task的result,并且此刻不想弹回到UI线程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JusterZhu 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概要
  • 二、详细内容
    • 1.同步异步
      • 什么是异步编程
      • 异步编程的两种用途
      • 经验之谈
    • 2.异步和Coninuation以及语言的支持
      • 语言对异步的支持非常重要
    • 3.await async
      • 异步方法如何执行
      • 可以await哪些?
      • 捕获本地状态
      • await之后在哪个线程上执行
      • UI上的await
      • 与粗粒度的并发相比
    • 4.编写异步函数
      • 异步中的同步上线文
        • OperationStarted 和 OperationCompleted
      • 5.优化同步完成
        • ValueTask
          • 6.取消 cancellation
            • CancellationToken 和 CancellationTokenSource
            • 获取CancellationToken
            • Delay
            • 同步方法
            • 其它
          • 7.TAP Task-based Asynchoronous Pattern
            • 8.Task组合器
              • WhenAny
              • WhenAny
              • WhenAny异常
              • 自定义task组合器
              • 自定义task组合器 通过cancellationToken 放弃task
          相关产品与服务
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档