问题:--我想使用他们的.NET SDK,从AWS S3中并行下载100个文件。下载的内容应该存储在100个内存流中(文件足够小,我可以从中提取)。我对任务、IAsyncResult、并行.*以及.NET 4.0中的其他不同方法感到困惑。
--如果我想自己解决这个问题--,从我的头顶上想出这样的伪代码:(编辑以向某些变量添加类型)
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;
// Prepare to launch requests
var asyncRequests = from rq in requestObjects
select _s3.BeginGetObject(rq,null,null);
// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();
// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched
select _s3.EndGetRequest(rq);
// Finish requests
var actualResponses = responses.ToList();
// Fetch data
var data = actualResponses.Select(rp => {
var ms = new MemoryStream();
rp.ResponseStream.CopyTo(ms);
return ms;
});
此代码并行启动100个请求,这是很好的。然而,有两个问题:
所以我开始觉得我走错路了.
帮助?
发布于 2012-05-07 11:19:03
如果将操作分解为一个方法,异步处理一个请求,然后调用它100次,可能会更容易。
首先,让我们确定您想要的最终结果。因为您将要使用的是一个MemoryStream
,这意味着您希望从您的方法返回一个Task
。签名将如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
因为AmazonS3
对象实现了异步设计模式,所以可以使用班级上的方法从实现异步设计模式的类生成Task<T>
,如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
因此,您已经在一个好地方了,您有一个Task<T>
,您可以在调用完成时等待或调用一个回调。但是,您需要以某种方式将从调用Task<GetObjectResponse>
返回的Task<GetObjectResponse>
转换为MemoryStream
。
为此,您希望在方法类上使用Task<T>
。把它看作是方法在班级上的异步版本,它只是对另一个Task<T>
的投影,只不过每次调用ContinueWith
时,您都可能会创建一个运行该部分代码的新任务。
这样,您的方法如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
请注意,在上面,您可以调用传递ContinueWith
的TaskContinuationOptions.ExecuteSynchronously
,因为您所做的工作似乎很少(我无法判断,响应可能很大)。在您所做的工作非常少的情况下,为了完成工作而启动一个新任务将是有害的,您应该通过TaskContinuationOptions.ExecuteSynchronously
,这样就不会浪费时间为最小的操作创建新的任务。
既然有了可以将一个请求转换为Task<MemoryStream>
的方法,那么创建一个包装器来处理任意数量的请求就很简单了:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
在上面,您只需获取GetObjectRequest
实例的序列,它将返回一个Task<MemoryStream>
数组。它返回物化序列这一事实很重要。如果在返回之前没有将其具体化,那么在迭代序列之前不会创建任务。
当然,如果您想要这种行为,那么无论如何,只需删除对.ToArray()
的调用,让方法返回IEnumerable<Task<MemoryStream>>
,然后在迭代任务时发出请求。
在那里,您可以一次一个地处理它们(在循环中使用方法 ),或者等待它们全部完成(通过调用方法)。后者的一个例子是:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
另外,应该指出,这非常适合于反应性扩展框架,因为这非常适合于IObservable
实现。
发布于 2022-03-15 11:08:20
您可以从Nexus.Tasks包中使用Nexus.Core。
var response = await fileNames
.WhenAll(item => GetObject(item, cancellationToken), 10, cancellationToken)
.ConfigureAwait(false);
https://stackoverflow.com/questions/10486822
复制相似问题