在上篇最后一个例子之后,我们发现了怎么去使用线程池,调用ThreadPool的QueueUserWorkItem方法来发起一次异步的、计算限制的操作,例子很简单,不是吗?
然而,在今天这篇博客中,我们要知道的是,QueueUserWorkItem这个技术存在许多限制。其中最大的问题是没有一个内建的机制让你知道操作在什么时候完成,也没有一个机制在操作完成是获得一个返回值,这些问题使得我们都不敢启用这个技术。
Microsoft为了克服这些限制(同时解决其他一些问题),引入了任务(tasks)的概念。顺带说一下我们得通过System.Threading.Tasks命名空间来使用它们。
现在我要说的是,用线程池不是调用ThreadPool的QueueUserWorkItem方法,而是用任务来做相同的事:
static void Main(string[] args)
{
Console.WriteLine("主线程启动");
//ThreadPool.QueueUserWorkItem(StartCode,5);
new Task(StartCode, 5).Start();
Console.WriteLine("主线程运行到此!");
Thread.Sleep(1000);
}
private static void StartCode(object i)
{
Console.WriteLine("开始执行子线程...{0}",i);
Thread.Sleep(1000);//模拟代码操作
}
}
嘿,你会发现结果是一样的。 再来看看这个是什么:
TaskCreationOptions这个类型是一个枚举类型,传递一些标志来控制Task的执行方式。TaskCreationOptions定义如下:
慢点,注释很详细,看看这些有好处,TaskScheduler(任务调度器)不懂没关系,请继续往下看,我会介绍的,但请注意,这些标识都只是一些提议而已,在调度一个Task时,可能会、也可能不会采纳这些提议,不过有一条要注意:AttachedToParent标志,它总会得到Task采纳,因为它和TaskScheduler本身无关。
来看下这段代码:
static void Main(string[] args)
{
//1000000000这个数字会抛出System.AggregateException
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000);
//可以现在开始,也可以以后开始
t.Start();
//Wait显式的等待一个线程完成
t.Wait();
Console.WriteLine("The Sum is:"+t.Result);
}
private static Int32 Sum(Int32 i)
{
Int32 sum = 0;
for (; i > 0; i--)
checked { sum += i; }
return sum;
}
}
这段代码大家应该猜得出是什么意思吧,人人都会写。 但是,我的结果为什么是t.Result而不直接是返回的Sum呢? 有没有多此一举的感觉?
下面我来说说这段代码我想表达的意思:
在一个线程调用Wait方法时,系统会检查线程要等待的Task是否已经开始执行,如果任务正在执行,那么这个Wait方法会使线程阻塞,知道Task运行结束为止。
就说上面的程序执行,因为累加数字太大,它抛出算术运算溢出错误,在一个计算限制任务抛出一个未处理的异常时,这个异常会被“包含”不并存储到一个集合中,而线程池线程是允许返回到线程池中的,在调用Wait方法或者Result属性时,这个成员会抛出一个System.AggregateException对象。
现在你会问,为什么要调用Wait或者Result?或者一直不查询Task的Exception属性?你的代码就永远注意不到这个异常的发生,如果不能捕捉到这个异常,垃圾回收时,抛出AggregateException,进程就会立即终止,这就是“牵一发动全身”,莫名其妙程序就自己关掉了,谁也不知道这是什么情况。所以,必须调用前面提到的某个成员,确保代码注意到异常,并从异常中恢复。悄悄告诉你,其实在用Result的时候,内部会调用Wait。
怎么恢复?
为了帮助你检测没有注意到的异常,可以向TaskScheduler的静态UnobservedTaskException时间等级一个回调方法,当Task被垃圾回收时,如果出现一个没有被注意到的异常,CLR终结器会引发这个事件。一旦引发,就会向你的时间处理器方法传递一个UnobservedTaskExceptionEvenArgs对象,其中包含了你没有注意的AggregateException。然后再调用UnobservedTasExceptionEvenArgs的SetObserved方法来指出你的异常已经处理好了,从而阻止CLR终止进程。这是个图省事的做法,要少做这些,宁愿终止进程,也不要呆着已经损坏的状态而继续运行。做人也一样,病了宁肯休息,也不要带病坚持上班,你没那么伟大,公司也不需要你的这一点伟大,命是自己的。(─.─|||扯远了。
除了单个等待任务,Task 还提供了两个静态方法:WaitAny和WaitAll,他们允许线程等待一个Task对象数组。
WaitAny方法会阻塞调用线程,知道数组中的任何一个Task对象完成,这个方法会返回一个索引值,指明完成的是哪一个Task对象。如果发生超时,方法将返回-1。它可以通过一个CancellationToken取消,会抛出一个OperationCanceledException。
WaitAll方法也会阻塞调用线程,知道数组中的所有Task对象都完成,如果全部完成就返回true,如果超时就返回false。当然它也能取消,同样会抛出OperationCanceledException。
说了这么两个取消任务的方法,现在来试试这个方法,加深下印象,修改先前例子代码,完整代码如下:
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);
//可以现在开始,也可以以后开始
t.Start();
//在之后的某个时间,取消CancellationTokenSource 以取消Task
cts.Cancel();//这是个异步请求,Task可能已经完成了。我是双核机器,Task没有完成过
//注释这个为了测试抛出的异常
//Console.WriteLine("This sum is:" + t.Result);
try
{
//如果任务已经取消了,Result会抛出AggregateException
Console.WriteLine("This sum is:" + t.Result);
}
catch (AggregateException x)
{
//将任何OperationCanceledException对象都视为已处理。
//其他任何异常都造成抛出一个AggregateException,其中
//只包含未处理的异常
x.Handle(e => e is OperationCanceledException);
Console.WriteLine("Sum was Canceled");
}
}
private static Int32 Sum(CancellationToken ct ,Int32 i)
{
Int32 sum = 0;
for (; i > 0; i--)
{
//在取消标志引用的CancellationTokenSource上如果调用
//Cancel,下面这一行就会抛出OperationCanceledException
ct.ThrowIfCancellationRequested();
checked { sum += i; }
}
return sum;
}
}
这个例子展示了一个任务在进行的时候中途取消的操作,我觉得它很有趣,你试试也会发现。 Lamada表达式写这个,是个亮点,得学学,将CancellationToken闭包变量“传递”。
如果不用Lamada表达式,这问题还真不好解决:
Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);
Sum(cts.Token,10000) 内的Token需要和cts.Token关联起来,你还能想出怎么关联起来么?
好,任务取消也讲玩了,来看个更好用的技术:
static void Main(string[] args)
{
Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000);
//可以现在开始,也可以以后开始
t.Start();
Task cwt = t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result));
cwt.Wait();
}
private static Int32 Sum(Int32 i)
{
Int32 sum = 0;
for (; i > 0; i--)
{
checked { sum += i; }
}
return sum;
}
}