我正在尝试使用tokio异步运行时在Rust中编写一个web爬虫。我希望异步地获取/处理多个页面,但我也希望爬虫在到达末尾时停止(换句话说,如果没有什么可以抓取的东西)。到目前为止,我已经使用全从作为Future
提供的异步函数获得集体结果,但这显然需要程序事先知道要爬行的总页面。例如:
async fn fetch(_url: String) -> Result<String, ()> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(String::from("foo"))
}
#[tokio::main]
async fn main() {
let search_url = "https://example.com/?page={page_num}";
let futures = (1..=3)
.map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
.map(|url| fetch(url));
let _ = futures::future::try_join_all(futures).await.unwrap();
}
在这个简单的例子中,在实际获取页面之前,我必须知道要通过的总页面(1..=3
)。我想要的是,不提供任何范围和条件,以停止整个过程。(例如,如果HTML结果包含“未找到”)
我查看了在……上面,但我不确定它是否可以用于此任务。
发布于 2021-09-25 01:00:11
下面大概是如何使用Stream
和.buffered()
来完成这一任务
use futures::{future, stream, StreamExt};
#[derive(Debug)]
struct Error;
async fn fetch_page(page: i32) -> Result<String, Error> {
println!("fetching page: {}", page);
// simulate loading pages
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if page < 5 {
// successfully got page
Ok(String::from("foo"))
} else {
// page doesn't exist
Err(Error)
}
}
#[tokio::main]
async fn main() {
let pages: Vec<String> = stream::iter(1..)
.map(fetch_page)
.buffered(10)
.take_while(|page| future::ready(page.is_ok()))
.map(|page| page.unwrap())
.collect()
.await;
println!("pages: {:?}", pages);
}
我将详细介绍main()
中的步骤:
stream::iter(1..)
创建一个表示每个页码的整数的无界Stream
。.map(fetch_page)
当然会为每个页码调用fetch_page
.buffered(10)
这将允许最多10个fetch_page
调用同时发生,并将保留原来的顺序。.take_while(|page| future::ready(page.is_ok()))
将保持流运行,直到fetch_page
返回错误为止,它使用futures::future::ready
,因为传递给take_while
的函数必须返回未来.map(|page| page.unwrap())
将取出成功的页面,它不会惊慌,因为我们知道当发生错误时流将停止.collect()
与迭代器做的事情本质上是一样的,除非您必须对它进行.await
。运行上述代码将输出以下内容,显示每次尝试10次,但只返回到第一次失败:
fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]
这掩盖了一些好的东西,如处理不丢失的页面错误或重试,但我希望这能给你一个良好的基础。在这些情况下,您可以使用TryStreamExt
上的方法,它专门处理Result
的流。
https://stackoverflow.com/questions/69321918
复制相似问题