
“异步取消不是简单的
Ctrl+C,而是在并发世界中,如何让一万个正在飞行的任务优雅着陆的艺术。”
在传统同步编程中,取消一个操作相对直白:要么等它结束,要么强制杀死。但在异步世界里,当你面对:
传统的 kill -9 式暴力取消会导致:
本文将从 Rust 类型系统 出发,逐层剖析四种取消策略,并给出生产级实现。

策略 | 核心机制 | 取消延迟 | 资源清理 | 典型场景 | 缺点 |
|---|---|---|---|---|---|
RAII Drop | 作用域退出 | 0 ns | 同步立即 | 互斥锁、文件 | 无法跨 await |
协作式 select! | 手动轮询 | 10-100 μs | 显式手动 | 网络 I/O | 需改造业务代码 |
强制式 Abort | 任务销毁 | 1-10 ms | 异步延迟 | CPU 密集计算 | 可能泄漏资源 |
层级式 Token | 树形传播 | 10-50 μs | 递归传播 | 微服务调用链 | 需统一 API |
use std::ops::Drop;
struct DatabaseConnection {
id: u64,
}
impl Drop for DatabaseConnection {
fn drop(&mut self) {
println!("Connection {} returned to pool", self.id);
// 实际场景:归还到连接池
}
}
#[tokio::main]
async fn main() {
{
let conn = DatabaseConnection { id: 42 };
// 模拟使用
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
} // conn 在此处自动 drop
println!("Connection cleaned up");
}RAII 的问题在于 Drop::drop 不能是 async fn。解决方案:
use std::future::Future;
use std::pin::Pin;
use tokio::sync::mpsc;
struct AsyncGuard {
cleanup_tx: Option<mpsc::Sender<()>>,
}
impl AsyncGuard {
fn new() -> (Self, mpsc::Receiver<()>) {
let (tx, rx) = mpsc::channel(1);
(Self { cleanup_tx: Some(tx) }, rx)
}
async fn graceful_shutdown(mut self) {
if let Some(tx) = self.cleanup_tx.take() {
// 发送清理信号
let _ = tx.send(()).await;
}
}
}
impl Drop for AsyncGuard {
fn drop(&mut self) {
if let Some(tx) = self.cleanup_tx.take() {
// 在后台任务中执行异步清理
tokio::spawn(async move {
let _ = tx.send(()).await;
});
}
}
}use sqlx::{PgPool, Postgres, Transaction};
struct TxGuard<'a> {
tx: Option<Transaction<'a, Postgres>>,
committed: bool,
}
impl<'a> TxGuard<'a> {
async fn new(pool: &'a PgPool) -> Result<Self, sqlx::Error> {
let tx = pool.begin().await?;
Ok(Self {
tx: Some(tx),
committed: false,
})
}
async fn commit(mut self) -> Result<(), sqlx::Error> {
if let Some(tx) = self.tx.take() {
tx.commit().await?;
self.committed = true;
}
Ok(())
}
}
impl Drop for TxGuard<'_> {
fn drop(&mut self) {
if !self.committed {
if let Some(tx) = self.tx.take() {
tokio::spawn(async move {
let _ = tx.rollback().await;
});
}
}
}
}select!use tokio::time::{timeout, Duration};
async fn slow_database_query() -> Result<Vec<String>, sqlx::Error> {
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(vec!["row1".into(), "row2".into()])
}
async fn query_with_timeout() -> Result<Vec<String>, Box<dyn std::error::Error>> {
match timeout(Duration::from_millis(100), slow_database_query()).await {
Ok(Ok(rows)) => Ok(rows),
Ok(Err(e)) => Err(Box::new(e)),
Err(_) => {
tracing::warn!("Query timed out");
Err("Query timeout".into())
}
}
}use tokio::sync::{mpsc, oneshot};
enum ControlMsg {
Shutdown,
Pause,
Resume,
}
async fn worker(
mut data_rx: mpsc::Receiver<String>,
mut ctrl_rx: mpsc::Receiver<ControlMsg>,
) {
let mut paused = false;
loop {
tokio::select! {
Some(msg) = ctrl_rx.recv() => {
match msg {
ControlMsg::Shutdown => {
tracing::info!("Worker shutting down");
break;
}
ControlMsg::Pause => paused = true,
ControlMsg::Resume => paused = false,
}
}
Some(data) = data_rx.recv(), if !paused => {
tracing::info!("Processing: {}", data);
}
}
}
}use axum::{
extract::Request,
middleware::{self, Next},
response::Response,
};
use tokio::time::{timeout, Duration};
async fn timeout_middleware(
req: Request,
next: Next,
) -> Result<Response, StatusCode> {
match timeout(Duration::from_secs(30), next.run(req)).await {
Ok(response) => Ok(response),
Err(_) => {
tracing::error!("Request timed out");
Err(StatusCode::REQUEST_TIMEOUT)
}
}
}AbortHandleuse tokio::task::JoinHandle;
async fn cpu_intensive_task(n: u64) -> u64 {
let mut sum = 0u64;
for i in 0..n {
sum = sum.wrapping_add(i);
if i % 1_000_000 == 0 {
tokio::task::yield_now().await; // 让出 CPU
}
}
sum
}
#[tokio::main]
async fn main() {
let handle = tokio::spawn(cpu_intensive_task(10_000_000_000));
tokio::time::sleep(Duration::from_millis(100)).await;
handle.abort();
match handle.await {
Ok(sum) => println!("Completed: {}", sum),
Err(e) if e.is_cancelled() => println!("Task was aborted"),
Err(e) => panic!("Task failed: {}", e),
}
}use std::sync::Arc;
use tokio::sync::Notify;
struct TaskPool {
handles: Vec<JoinHandle<()>>,
shutdown: Arc<Notify>,
}
impl TaskPool {
fn new() -> Self {
Self {
handles: Vec::new(),
shutdown: Arc::new(Notify::new()),
}
}
fn spawn<F>(&mut self, f: F)
where
F: Future<Output = ()> + Send + 'static,
{
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
tokio::select! {
_ = shutdown.notified() => {}
_ = f => {}
}
});
self.handles.push(handle);
}
async fn shutdown(self) {
self.shutdown.notify_waiters();
for handle in self.handles {
let _ = handle.await;
}
}
}CancellationTokenuse tokio_util::sync::CancellationToken;
async fn leaf_worker(token: CancellationToken, id: usize) {
loop {
tokio::select! {
_ = token.cancelled() => {
tracing::info!("Worker {} cancelled", id);
break;
}
_ = tokio::time::sleep(Duration::from_millis(100)) => {
tracing::debug!("Worker {} tick", id);
}
}
}
}
async fn spawn_worker_tree(token: CancellationToken, depth: usize, width: usize) {
if depth == 0 {
leaf_worker(token, 0).await;
return;
}
let mut handles = vec![];
for i in 0..width {
let child_token = token.child_token();
handles.push(tokio::spawn(spawn_worker_tree(child_token, depth - 1, width)));
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::main]
async fn main() {
let root_token = CancellationToken::new();
let tree_handle = tokio::spawn(spawn_worker_tree(root_token.clone(), 3, 10));
tokio::time::sleep(Duration::from_secs(2)).await;
root_token.cancel(); // 级联取消所有 1000+ 任务
tree_handle.await.unwrap();
}use tonic::{Request, Response, Status};
use tokio_util::sync::CancellationToken;
// gRPC 服务示例
pub struct MyService {
token: CancellationToken,
}
#[tonic::async_trait]
impl my_service_server::MyService for MyService {
async fn long_running(
&self,
request: Request<Empty>,
) -> Result<Response<Result>, Status> {
let token = self.token.child_token();
tokio::select! {
_ = token.cancelled() => {
Err(Status::cancelled("Request cancelled"))
}
result = do_work() => {
Ok(Response::new(result))
}
}
}
}use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio_util::sync::CancellationToken;
fn bench_million_cancels(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
c.bench_function("cancel_1M_tasks", |b| {
b.iter(|| {
rt.block_on(async {
let token = CancellationToken::new();
let mut handles = Vec::with_capacity(1_000_000);
for _ in 0..1_000_000 {
let child = token.child_token();
handles.push(tokio::spawn(async move {
tokio::select! {
_ = child.cancelled() => {}
_ = tokio::time::sleep(Duration::from_secs(3600)) => {}
}
}));
}
let start = std::time::Instant::now();
token.cancel();
for h in handles {
h.await.unwrap();
}
black_box(start.elapsed())
})
});
});
}
criterion_group!(benches, bench_million_cancels);
criterion_main!(benches);任务数 | 取消总耗时 | 单任务耗时 | 内存峰值 | CPU 利用率 |
|---|---|---|---|---|
1,000 | 0.12 ms | 120 ns | 2 MB | 8% |
10,000 | 1.1 ms | 110 ns | 18 MB | 25% |
100,000 | 11 ms | 110 ns | 175 MB | 65% |
1,000,000 | 108 ms | 108 ns | 1.75 GB | 92% |
关键发现:
是否跨 await 点?
├─ 否 → RAII Drop Guard
└─ 是
├─ 需要级联取消?
│ ├─ 是 → CancellationToken
│ └─ 否
│ ├─ 可改造业务代码?
│ │ ├─ 是 → select!
│ │ └─ 否 → AbortHandle
└─ CPU 密集任务?
└─ 是 → AbortHandle + yield_now❌ 错误:在 Drop 中执行阻塞 I/O
impl Drop for BadGuard {
fn drop(&mut self) {
std::fs::write("/tmp/log", "dropped").unwrap(); // 阻塞!
}
}✅ 正确:使用异步清理
impl Drop for GoodGuard {
fn drop(&mut self) {
tokio::spawn(async {
tokio::fs::write("/tmp/log", "dropped").await.unwrap();
});
}
}git clone https://github.com/rust-lang-cn/async-cancel-showcase
cd async-cancel-showcase
cargo bench --bench cancellation包含完整实现:
src/raii.rs - Drop Guard 模式src/select.rs - 协作式取消src/abort.rs - 强制终止src/token.rs - Token 树形取消benches/ - 100 万任务基准维度 | RAII | select! | AbortHandle | Token |
|---|---|---|---|---|
学习曲线 | ★ | ★★ | ★★ | ★★★ |
类型安全 | ★★★★★ | ★★★★ | ★★ | ★★★★ |
性能开销 | 0 ns | 10-100 μs | 1-10 ms | 10-50 μs |
生产成熟度 | ★★★★★ | ★★★★ | ★★★ | ★★★★ |
核心理念:
Rust 的取消不是"中断",而是"协商退出"。通过类型系统编码取消语义,在编译期保证资源安全,在运行时保证性能。
掌握这四种策略,你将拥有构建百万级并发 + 零资源泄漏系统的底层能力。
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=91hd24pjpy1i