C# 多线程七之Parallel

1、简介

关于Parallel不想说太多,因为它是Task的语法糖,至少我是这么理解的,官方文档也是这么说的,它本身就是基本Task的.假设我们有一个集合,不管是什么集合,我们要遍历它,首先想到的是For(如何涉及到修改或者读可以用for)或者Foreach(如果单纯的读),但是它两是同步的去操作集合,但是使用Parallel的静态For或者Foreach那就可以让多个线程参与这个工作,这样就能充分的利用CPU,但是你需要考虑CPU上下文产生的性能消耗,以及Parallel本身的性能消耗,所以,这也能解释为什么,你的循环里面执行的是不耗时的操作,使用for或者foreach的速度比使用Parallel的要快,所以使用Parallel还是要慎重.而且使用Parallel还需要注意的一点就是,不能有多线程争用问题,就是你的循环体里面不能有操作静态资源的操作.如果真的需要,那你可以加锁,但是那就失去它的优势了.

2、使用注意点

(1)、不能操作共享资源,代码如下:

        static int shareData = 10;
        static void Main(string[] args)
        {
            Parallel.For(1, 100000, i => Add(i));
            Console.Write(shareData);
            Console.ReadKey();
        }

        static void Add(int i)
        {
            shareData += i;
        }

代码逻辑很简单,1+2+3+......+100000这个过程中每次都加一个10,怎么说都是正数,但是,信不信它能给你加出个负数来(可能是内存溢出了),而且每次的结果都不一样(重要)!自己试试吧!

解决方案很简单,加个锁,代码如下:

    class ParallerStudy
    {
        static int shareData = 10;
        static object lockObj = new object();
        static void Main(string[] args)
        {
            Parallel.For(1, 100000, i => Add(i));
            Console.Write(shareData);
            Console.ReadKey();
        }

        static void Add(int i)
        {
            lock (lockObj)
            {
                shareData += i;
            }
        }
    }

这个肯定是正确的值,因为每次的输出都是这个,这里因为如果给循环的最终值设小的话,他好像是同步去做了,不会有问题,所以这里给了个100000,这个时候它会开多个线程去做.

(2)、它可以向Task一样抛出异常,都是AggregateException,代码如下:

这里就给截图了,不写代码了.

(3)、性能开销

这个不用多说,它是基于Task,开销还是有的,如果不清楚,去看我前面的文章

(4)、支持取消

取消貌似只能取消整个Parallel运算,不支持取消内部的方法,我试了不行,而且必须在执行Parallel之前取消它,之后都不行.很其怪,可能我的调用方式有问题,如果你们有好的方法,欢迎在下面评论.

(4)、可以设置最多的线程数 

实战中有演示

(5)、调度器

这里就不介绍了,后续的随笔中会介绍

(6)、三个重要的委托

实战中有演示 

3、实战

(1)、下面写个使用Parallel多线程去读文件的例子

代码如下:

    class ParallerStudy
    {
        static void Main(string[] args)
        {
            var targetPath = $"{AppDomain.CurrentDomain.BaseDirectory}Test";
            var totalLength = DictionaryFilesContent(targetPath, "", SearchOption.TopDirectoryOnly);
            Console.WriteLine("{0}目录下所有的文件长度总和为:{1}", targetPath, totalLength);
            Console.ReadKey();
        }

        /// <summary>
        /// 多线程读取多个文件的内容
        /// </summary>
        /// <param name="i"></param>
        /// <returns></returns>
        static long DictionaryFilesContent(string path, string searchPattern, SearchOption searchOptions)
        {
            var opts = new ParallelOptions();
            //只允许开六个线程去做这个事情
            opts.MaxDegreeOfParallelism =3;
            var files = Directory.EnumerateFiles(path);
            long totalFileLength = 0;
            Parallel.ForEach<string, long>(
                files,
                opts,
                //初始委托,该方法会在线程执行主要任务前执行,可用于参数校验等操作
                () =>
                {
                    Console.WriteLine("开启读取文件,当前线程Id为{0}", Thread.CurrentThread.ManagedThreadId);
                    return 0;
                },
                //主体委托,开始干正事
                (file, loopstate, index, taskLocalCount) =>
                {
                    long fileLength = 0;
                    FileStream fs = null;
                    try
                    {
                        fs = File.OpenRead(file);
                        fileLength = fs.Length;
                        Console.WriteLine("当前线程Id为{1},当前文件名为{2}", index, Thread.CurrentThread.ManagedThreadId, fs.Name);

                    }
                    catch (IOException) { }//排除拒绝访问的文件
                    finally
                    {
                        if (fs != null)
                            fs.Dispose();
                    }
                    return taskLocalCount + fileLength;
                },
                //终结委托,一般用于汇总主体委托的结果值,所以如果这里涉及访问共享资源的话,一般会用同步构造,也就是加锁等操作
                taskLocalCount =>
                {
                    //taskLocalCount=taskLocalCount + fileLength,单个文件的长度
                    //同步构造,不需要加锁,当每个线程读取完对应文件的长度后,将长度加到totalFileLength中,这个时候多个线程访问这个变量可能会出现
                    //多线程争用问题,但是使用Interlocked.Add相当与给totalFileLength加锁
                    Interlocked.Add(ref totalFileLength, taskLocalCount);
                });
            return totalFileLength;
        }
    }

看这个例子前,还在想真的有这么厉害吗?其实也就那样,根据输出可以发现,一个开了3个线程,去读10个文件,我还在想这里面会不会有多线程争用问题,但是没有,你看它怎么做的,每个线程只会去读一个文件,读的快的,立即去读另外的文件,我执行了N次,发现并没有一个文件多个线程读的问题,所以每个线程只会去读一个文件,自然就不会有多线程争用问题了.

(2)、关于ParallelLoopState的用法

Stop()和Break方法最常用,当子任务处理批量的任务时,如果满足某种条件,则告诉其余的任务不需要在处理了.

这个对象主要用于Parallel开启的子任务群,它们内部之间的交流,代码如下:

        static void Main(string[] args)
        {
            var targetPath = $"{AppDomain.CurrentDomain.BaseDirectory}Test";
            var totalLength = DictionaryFilesContent(targetPath, "", SearchOption.TopDirectoryOnly);
            Console.WriteLine("{0}目录下所有的文件长度总和为:{1}", targetPath, totalLength);
            Console.ReadKey();
        }

        /// <summary>
        /// 多线程读取多个文件的内容
        /// </summary>
        /// <param name="i"></param>
        /// <returns></returns>
        static long DictionaryFilesContent(string path, string searchPattern, SearchOption searchOptions)
        {
            var opts = new ParallelOptions();
            //只允许开六个线程去做这个事情
            opts.MaxDegreeOfParallelism =3;
            var files = Directory.EnumerateFiles(path);
            long totalFileLength = 0;
            Parallel.ForEach<string, long>(
                files,
                opts,

                //初始委托,该方法会在线程执行主要任务前执行,可用于参数校验等操作
                () =>
                {
                    Console.WriteLine("开启读取文件,当前线程Id为{0}", Thread.CurrentThread.ManagedThreadId);
                    return 0;
                },
                //主体委托,开始干正事
                (file, loopstate, index, taskLocalCount) =>
                {
                    long fileLength = 0;
                    FileStream fs = null;
                    try
                    {
                        //处理完第3项,就不要在处理了,这个第三项的意思是不是第三个文件,也可能是第五个文件
                        if (index == 3)
                        {
                            loopstate.Stop();
                        }
                        fs = File.OpenRead(file);
                        fileLength = fs.Length;
                        Console.WriteLine("当前线程Id为{1},当前文件名为{2}", index, Thread.CurrentThread.ManagedThreadId, fs.Name);

                    }
                    catch (IOException) { }//排除拒绝访问的文件
                    finally
                    {
                        if (fs != null)
                            fs.Dispose();
                    }
                    return taskLocalCount + fileLength;
                },
                //终结委托,一般用于汇总主体委托的结果值,所以如果这里涉及访问共享资源的话,一般会用同步构造,也就是加锁等操作
                taskLocalCount =>
                {
                    //taskLocalCount=taskLocalCount + fileLength,单个文件的长度
                    //同步构造,不需要加锁,当每个线程读取完对应文件的长度后,将长度加到totalFileLength中,这个时候多个线程访问这个变量可能会出现
                    //多线程争用问题,但是使用Interlocked.Add相当与给totalFileLength加锁
                    Interlocked.Add(ref totalFileLength, taskLocalCount);
                });
            return totalFileLength;
        }

 还有其它的一些用法,这里就不介绍了,Api里面都有介绍.

(3)、Parallel的返回值

就说一个LowestBreakIteration,如果这个返回值为null,说明子任务群有个调用了Stop方法,如果不为null,说明有个调用了Break方法且值为调用Break任务对应的Index值.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券