The async example很有用,但作为Rust和Tokio的新手,我正在努力弄清楚如何一次处理N个请求,使用向量中的URL,并为每个URL创建一个作为字符串的响应HTML迭代器。
如何做到这一点呢?
发布于 2018-06-27 00:41:25
并发请求
从reqwest 0.10开始:
use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)
获取一个字符串集合并将其转换为Stream
。
.map(|url| {
在流中的每个元素上运行异步函数,并将元素转换为新类型。
让客户端=客户端;异步移动{
获取对Client
的显式引用,并将该引用(而不是原始Client
)移动到匿名异步块中。
让
=client.get(Url).send().await?
使用Client
的连接池启动异步GET请求并等待该请求。
resp.bytes().await
请求并等待响应的字节数。
.buffer_unordered(N);
将一个期货流转换为这些未来值的流,同时执行这些期货。
异步正文.for_each(|b| {
{.for_each b{ Ok(b) => println!(“ {} bytes",b.len()),Err(e) => eprintln!("Got an error:{}",e),}) .await;
将流转换回单个未来,打印出一路上接收到的数据量,然后等待未来完成。
另请参阅:
then
, and\_then
and or\_else
in Rust futures?没有限制的执行
如果您愿意,还可以将迭代器转换为未来的迭代器,并使用future::join_all
use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}
我鼓励使用第一个示例,因为您通常希望限制并发,而buffer
和buffer_unordered
可以帮助您。
并行请求
并发请求通常是足够好的,但有时您需要需要并行请求。在这种情况下,您需要生成一个任务。
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}
主要区别是:
tokio::spawn
在不同的任务中执行工作。reqwest::Client
。作为recommended,我们克隆一个共享客户端,以便在无法加入任务时使用connection pool.另请参阅:
发布于 2021-10-25 23:02:15
如果可能的话,我建议你使用std async和rayon。它们现在都很成熟,并且非常容易上手,因为标准中的异步{/*代码这里*/}作用域的界限。您还可以使用功能集成https://docs.rs/async-std/1.10.0/async_std/#features在tokio中/与tokio一起工作
https://stackoverflow.com/questions/51044467
复制相似问题