首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用reqwest执行并行异步HTTP GET请求?

如何使用reqwest执行并行异步HTTP GET请求?
EN

Stack Overflow用户
提问于 2018-06-26 21:46:20
回答 2查看 13.6K关注 0票数 35

The async example很有用,但作为Rust和Tokio的新手,我正在努力弄清楚如何一次处理N个请求,使用向量中的URL,并为每个URL创建一个作为字符串的响应HTML迭代器。

如何做到这一点呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-06-27 00:41:25

并发请求

从reqwest 0.10开始:

代码语言:javascript
运行
复制
use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter

获取一个字符串集合并将其转换为Stream

.map(|url| {

StreamExt::map

在流中的每个元素上运行异步函数,并将元素转换为新类型。

让客户端=客户端;异步移动{

获取对Client的显式引用,并将该引用(而不是原始Client)移动到匿名异步块中。

=client.get(Url).send().await?

使用Client的连接池启动异步GET请求并等待该请求。

resp.bytes().await

请求并等待响应的字节数。

.buffer_unordered(N);

StreamExt::buffer_unordered

将一个期货流转换为这些未来值的流,同时执行这些期货。

异步正文.for_each(|b| {

{.for_each b{ Ok(b) => println!(“ {} bytes",b.len()),Err(e) => eprintln!("Got an error:{}",e),}) .await;

StreamExt::for_each

将流转换回单个未来,打印出一路上接收到的数据量,然后等待未来完成。

另请参阅:

没有限制的执行

如果您愿意,还可以将迭代器转换为未来的迭代器,并使用future::join_all

代码语言:javascript
运行
复制
use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

我鼓励使用第一个示例,因为您通常希望限制并发,而bufferbuffer_unordered可以帮助您。

并行请求

并发请求通常是足够好的,但有时您需要需要并行请求。在这种情况下,您需要生成一个任务。

代码语言:javascript
运行
复制
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

主要区别是:

  • 我们使用tokio::spawn在不同的任务中执行工作。
  • 我们必须为每个任务分配自己的reqwest::Client。作为recommended,我们克隆一个共享客户端,以便在无法加入任务时使用connection pool.
  • There's,这是一个附加错误情况。

另请参阅:

票数 79
EN

Stack Overflow用户

发布于 2021-10-25 23:02:15

如果可能的话,我建议你使用std async和rayon。它们现在都很成熟,并且非常容易上手,因为标准中的异步{/*代码这里*/}作用域的界限。您还可以使用功能集成https://docs.rs/async-std/1.10.0/async_std/#features在tokio中/与tokio一起工作

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51044467

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档