首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在流水线混合网络和文件系统IO中使用Rx.NET?

如何在流水线混合网络和文件系统IO中使用Rx.NET?
EN

Stack Overflow用户
提问于 2015-10-06 21:41:06
回答 1查看 431关注 0票数 1

我有以下要求:

  1. 从多个远程站点收集某些信息。
  2. 将信息序列化到磁盘。
  3. 与相同的站点联系,并确认数据已成功收集。

这是一个非常简化的流程,真正的流程还必须处理故障和其他方面,我认为这与我的问题似乎暂时无关。

无论如何,下面是我如何实现所描述的流程:

代码语言:javascript
运行
复制
var data = await GetSitesSource()
    .Select(site => Observable
        .FromAsync(() => GetInformationFromSiteAsync(site))
        .Select(site.MakeKeyValuePair))
    .Merge(maxConcurrentSiteRequests)
    .ToList();

if (data.Count > 0)
{
    var filePath = GetFilePath();
    using (var w = new StreamWriter(filePath))
    {
        await w.WriteAsync(YieldLines(data));
    }
    var tsUTC = DateTime.UtcNow;
    await data.ToObservable()
        .Select(o => Observable.FromAsync(() => AckInformationFromSiteAsync(o.Key, tsUTC, o.Value.InformationId)))
        .Merge(maxConcurrentSiteRequests);
}

其中:

  • MakeKeyValuePair是返回KeyValuePair<K,V>实例的扩展方法。
  • YieldLinesdata转换为IEnumerable<string>
  • WriteAsync是一种虚构的扩展方法,它将一系列字符串写入其StreamWriter

这似乎不是一个很好的实现,因为我没有利用这样一个事实:当记录从第一个Merge操作符出来时,我就可以开始写记录了。

我可以使用SelectMany + Merge(1)操作符异步地将块写入文件(顺序并不重要),但是如何确保仅在需要时才初始化相应的StreamWriter并将其正确处理?因为如果没有数据,我甚至不想初始化StreamWriter

我的问题--如何重写这段代码,使可观察到的管道在中间不被中断以写出文件?它应包括所有三个阶段:

  1. 从多个站点获取数据
  2. 把数据一个一个地写下来,顺序无关紧要。
  3. 一旦写入所有数据,请确认数据。
EN

回答 1

Stack Overflow用户

发布于 2015-10-07 07:12:39

我还没有对此进行测试,但您的代码中没有一个排除了将其连接在一起的可能性。所以你可以这样做:

代码语言:javascript
运行
复制
//The ToObservable extension for Task is only available through
using System.Reactive.Threading.Tasks;

GetSitesSource()
    .Select(site => Observable
        .FromAsync(() => GetInformationFromSiteAsync(site))
        .Select(site.MakeKeyValuePair))
    .Merge(maxConcurrentSiteRequests)
    .ToList()
    //Only proceed if we received data
    .Where(data => data.Count > 0)
    .SelectMany(data =>
      //Gives the StreamWriter the same lifetime as this Observable once it subscribes
      Observable.Using(
        () => new StreamWriter(GetFilePath()), 
        (w) => w.WriteAsync(YieldLines(data)).ToObservable()),
      //We are interested in the original data value, not the write result
      (data, _) => data)
    //Attach a timestamp of when data passed through here
    .Timestamp()
    .SelectMany(o=> {
      var ts = o.Timestamp;
      var data= o.Value;
      //This is actually returning IEnumerable<IObservable<T>> but merge
      //will implicitly handle it.
      return data.Select(i => Observable.FromAsync(() => 
                               AckInformationFromSiteAsync(i.Key, ts,
                                                           i.Value.InformationId)))
                .Merge(maxConcurrentSiteRequests);
    })
    //Handle the return values, fatal errors and the completion of the stream.
    .Subscribe();

更全面地回答你的问题

Using操作符将必须实现IDisposable的资源绑定到可观察对象的生存期。第一个参数是一个工厂函数,它将在订阅可观测值时被调用一次。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32980247

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档