我有以下要求:
这是一个非常简化的流程,真正的流程还必须处理故障和其他方面,我认为这与我的问题似乎暂时无关。
无论如何,下面是我如何实现所描述的流程:
var data = await GetSitesSource()
.Select(site => Observable
.FromAsync(() => GetInformationFromSiteAsync(site))
.Select(site.MakeKeyValuePair))
.Merge(maxConcurrentSiteRequests)
.ToList();
if (data.Count > 0)
{
var filePath = GetFilePath();
using (var w = new StreamWriter(filePath))
{
await w.WriteAsync(YieldLines(data));
}
var tsUTC = DateTime.UtcNow;
await data.ToObservable()
.Select(o => Observable.FromAsync(() => AckInformationFromSiteAsync(o.Key, tsUTC, o.Value.InformationId)))
.Merge(maxConcurrentSiteRequests);
}
其中:
MakeKeyValuePair
是返回KeyValuePair<K,V>
实例的扩展方法。YieldLines
将data
转换为IEnumerable<string>
WriteAsync
是一种虚构的扩展方法,它将一系列字符串写入其StreamWriter
。这似乎不是一个很好的实现,因为我没有利用这样一个事实:当记录从第一个Merge
操作符出来时,我就可以开始写记录了。
我可以使用SelectMany
+ Merge(1)
操作符异步地将块写入文件(顺序并不重要),但是如何确保仅在需要时才初始化相应的StreamWriter
并将其正确处理?因为如果没有数据,我甚至不想初始化StreamWriter
。
我的问题--如何重写这段代码,使可观察到的管道在中间不被中断以写出文件?它应包括所有三个阶段:
发布于 2015-10-07 07:12:39
我还没有对此进行测试,但您的代码中没有一个排除了将其连接在一起的可能性。所以你可以这样做:
//The ToObservable extension for Task is only available through
using System.Reactive.Threading.Tasks;
GetSitesSource()
.Select(site => Observable
.FromAsync(() => GetInformationFromSiteAsync(site))
.Select(site.MakeKeyValuePair))
.Merge(maxConcurrentSiteRequests)
.ToList()
//Only proceed if we received data
.Where(data => data.Count > 0)
.SelectMany(data =>
//Gives the StreamWriter the same lifetime as this Observable once it subscribes
Observable.Using(
() => new StreamWriter(GetFilePath()),
(w) => w.WriteAsync(YieldLines(data)).ToObservable()),
//We are interested in the original data value, not the write result
(data, _) => data)
//Attach a timestamp of when data passed through here
.Timestamp()
.SelectMany(o=> {
var ts = o.Timestamp;
var data= o.Value;
//This is actually returning IEnumerable<IObservable<T>> but merge
//will implicitly handle it.
return data.Select(i => Observable.FromAsync(() =>
AckInformationFromSiteAsync(i.Key, ts,
i.Value.InformationId)))
.Merge(maxConcurrentSiteRequests);
})
//Handle the return values, fatal errors and the completion of the stream.
.Subscribe();
更全面地回答你的问题
Using
操作符将必须实现IDisposable
的资源绑定到可观察对象的生存期。第一个参数是一个工厂函数,它将在订阅可观测值时被调用一次。
https://stackoverflow.com/questions/32980247
复制相似问题