首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >提取zip、解析文件并将其压缩到CSV

提取zip、解析文件并将其压缩到CSV
EN

Stack Overflow用户
提问于 2017-02-22 14:05:10
回答 2查看 309关注 0票数 2

我试图最大限度地提高以下任务的性能:

  • 枚举zip文件的目录
  • 在内存中提取zip以查找.json文件(处理嵌套的zip)
  • 解析json文件
  • 将属性从json文件写入聚合的.CSV文件

我要做的TPL布局是:

代码语言:javascript
运行
复制
producer -> parser block -> batch block -> csv writer block

其思想是,单个生产者提取zip并找到json文件,将文本发送到并行运行的解析器块(多使用者)。批处理块被分组为200,写入块将200行转储到每个调用的CSV文件中。

问题:

  • jsonParseBlock TransformBlock花费的时间越长,丢弃的消息就越多。我怎么才能阻止这一切?
  • 如何更好地利用TPL来最大化性能? 类项目{公共字符串ID { get;set;}公共字符串名称{ get;set;} class Demo { const string OUT_FILE = @"c:\temp\tplflat.csv";const字符串DATA_DIR = @"c:\temp\tpldata";静态ExecutionDataflowBlockOptions parseOpts =新ExecutionDataflowBlockOptions() { SingleProducerConstrained=true,MaxDegreeOfParallelism = 8,BoundedCapacity = 100 };静态ExecutionDataflowBlockOptions writeOpts =新ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };公共静态void (){ Console.WriteLine($"{Environment.ProcessorCount}处理器可用“);_InitTest();//重设csv文件,在需要时生成测试数据// start填充var = Stopwatch.StartNew();//转换器var jsonParseBlock =新TransformBlock(rawstr => { var item = Newtonsoft.Json.JsonConvert.DeserializeObject(rawstr);)System.Threading.Thread.Sleep(15);//这里睡眠越多,丢失返回项的消息越多;},parseOpts);//批处理块jsonBatchBlock =新的BatchBlock(200);// writer flatWriterBlock =新的ActionBlock(条目=> {//Console.WriteLine($“Wing{items.Length} to csv");StringBuilder sb =新StringBuilder();foreach (var item in Item){ sb.AppendLine($"{item.ID},{item.Name}");} File.AppendAllText(OUT_FILE,sb.ToString();});jsonParseBlock.LinkTo(jsonBatchBlock,new DataflowLinkOptions { PropagateCompletion = true });jsonBatchBlock.LinkTo(flatWriterBlock,new DataflowLinkOptions { PropagateCompletion = true });//开始执行以下工作: var crawlerTask = GetJsons(DATA_DIR,jsonParseBlock);crawlerTask.Wait();flatWriterBlock.Completion.Wait();Console.WriteLine($“警告: tplflat.csv行计数应该与测试数据匹配”);Console.WriteLine($“在{sw.ElapsedMilliseconds / 1000.0} secs中完成”);}静态异步任务GetJsons(string,ITargetBlock队列){ int count = 1;foreach (var zip in Directory.EnumerateFiles(filepath,"*.zip")) { Console.WriteLine($"working on zip #{count++}");var zipStream =新FileStream(zip,FileMode.Open);等待ExtractJsonsInMemory(zip,zipStream,queue);} queue.Complete();}静态异步任务ExtractJsonsInMemory(字符串文件名、流流、ITargetBlock队列){ ZipArchive存档=新的ZipArchive(流);foreach (ZipArchiveEntry entry in archive.Entries) { if (entry.Name.EndsWith(".json",StringComparison.OrdinalIgnoreCase)) { new (TextReader reader =new StreamReader(entry.Open(),Encoding.UTF8)) { var jsonText = reader.ReadToEnd();等待queue.SendAsync(jsonText);}}如果(entry.Name.EndsWith(".zip",StringComparison.OrdinalIgnoreCase)) {等待ExtractJsonsInMemory(entry.FullName,entry.Open(),队列);}

Update1

我添加了async,但我不清楚如何等待所有数据流块的完成( c#、异步和tpl都是新的)。我基本上想说,“一直运行到所有的队列/块都是空的”。我添加了下面的“等待”代码,并且似乎正在工作。

代码语言:javascript
运行
复制
// wait for crawler to finish
crawlerTask.Wait(); 
// wait for the last block
flatWriterBlock.Completion.Wait(); 
EN

Stack Overflow用户

回答已采纳

发布于 2017-02-22 18:22:52

简而言之,您的投递和忽略返回值。您有两个选项:添加一个未绑定的BufferBlock来保存所有传入数据,或者在SendAsync上存储await,这将防止任何消息被丢弃。

代码语言:javascript
运行
复制
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
    var archive = new ZipArchive(stream);
    foreach (ZipArchiveEntry entry in archive.Entries)
    {
        if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
        {
            using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
            {
                var jsonText = reader.ReadToEnd();
                await queue.SendAsync(jsonText);
            }
        }
        else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
        {
            await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
        }
    }
}

您需要将异步拉回原来的位置,但这应该会让您开始工作。

票数 1
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42393721

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档