我试图最大限度地提高以下任务的性能:
.json
文件(处理嵌套的zip)json
文件json
文件写入聚合的.CSV
文件我要做的TPL布局是:
producer -> parser block -> batch block -> csv writer block
其思想是,单个生产者提取zip并找到json文件,将文本发送到并行运行的解析器块(多使用者)。批处理块被分组为200,写入块将200行转储到每个调用的CSV文件中。
问题:
TransformBlock
花费的时间越长,丢弃的消息就越多。我怎么才能阻止这一切?TPL
来最大化性能?
类项目{公共字符串ID { get;set;}公共字符串名称{ get;set;} class Demo { const string OUT_FILE = @"c:\temp\tplflat.csv";const字符串DATA_DIR = @"c:\temp\tpldata";静态ExecutionDataflowBlockOptions parseOpts =新ExecutionDataflowBlockOptions() { SingleProducerConstrained=true,MaxDegreeOfParallelism = 8,BoundedCapacity = 100 };静态ExecutionDataflowBlockOptions writeOpts =新ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };公共静态void (){ Console.WriteLine($"{Environment.ProcessorCount}处理器可用“);_InitTest();//重设csv文件,在需要时生成测试数据// start填充var = Stopwatch.StartNew();//转换器var jsonParseBlock =新TransformBlock(rawstr => { var item = Newtonsoft.Json.JsonConvert.DeserializeObject(rawstr);)System.Threading.Thread.Sleep(15);//这里睡眠越多,丢失返回项的消息越多;},parseOpts);//批处理块jsonBatchBlock =新的BatchBlock(200);// writer flatWriterBlock =新的ActionBlock(条目=> {//Console.WriteLine($“Wing{items.Length} to csv");StringBuilder sb =新StringBuilder();foreach (var item in Item){ sb.AppendLine($"{item.ID},{item.Name}");} File.AppendAllText(OUT_FILE,sb.ToString();});jsonParseBlock.LinkTo(jsonBatchBlock,new DataflowLinkOptions { PropagateCompletion = true });jsonBatchBlock.LinkTo(flatWriterBlock,new DataflowLinkOptions { PropagateCompletion = true });//开始执行以下工作: var crawlerTask = GetJsons(DATA_DIR,jsonParseBlock);crawlerTask.Wait();flatWriterBlock.Completion.Wait();Console.WriteLine($“警告: tplflat.csv行计数应该与测试数据匹配”);Console.WriteLine($“在{sw.ElapsedMilliseconds / 1000.0} secs中完成”);}静态异步任务GetJsons(string,ITargetBlock队列){ int count = 1;foreach (var zip in Directory.EnumerateFiles(filepath,"*.zip")) { Console.WriteLine($"working on zip #{count++}");var zipStream =新FileStream(zip,FileMode.Open);等待ExtractJsonsInMemory(zip,zipStream,queue);} queue.Complete();}静态异步任务ExtractJsonsInMemory(字符串文件名、流流、ITargetBlock队列){ ZipArchive存档=新的ZipArchive(流);foreach (ZipArchiveEntry entry in archive.Entries) { if (entry.Name.EndsWith(".json",StringComparison.OrdinalIgnoreCase)) { new (TextReader reader =new StreamReader(entry.Open(),Encoding.UTF8)) { var jsonText = reader.ReadToEnd();等待queue.SendAsync(jsonText);}}如果(entry.Name.EndsWith(".zip",StringComparison.OrdinalIgnoreCase)) {等待ExtractJsonsInMemory(entry.FullName,entry.Open(),队列);}Update1
我添加了async
,但我不清楚如何等待所有数据流块的完成( c#、异步和tpl都是新的)。我基本上想说,“一直运行到所有的队列/块都是空的”。我添加了下面的“等待”代码,并且似乎正在工作。
// wait for crawler to finish
crawlerTask.Wait();
// wait for the last block
flatWriterBlock.Completion.Wait();
发布于 2017-02-22 18:22:52
简而言之,您的投递和忽略返回值。您有两个选项:添加一个未绑定的BufferBlock
来保存所有传入数据,或者在SendAsync
上存储await
,这将防止任何消息被丢弃。
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
var archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}
您需要将异步拉回原来的位置,但这应该会让您开始工作。
https://stackoverflow.com/questions/42393721
复制相似问题