
异步管道模式 - 渴望工作
本文最后更新于 2025-01-23,本文发布时间距今超过 90 天, 文章内容可能已经过时。最新内容请以官方内容为准
异步管道模式 - 渴望工作
关于流的发现
在上一篇文章中,我们深入研究了 Rust 的 futures::Stream
的运行特性。我们发现了一些相当令人惊讶的结果。在我们预期会有高度并发执行的地方,却发现了一些阻碍点。
我们将流的执行可视化为动画。以下这个流:
async fn async_work(i32) -> i32 {...}
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10)
.map(async_work)
.buffered(3)
.map(async_work)
.buffered(3);
while let Some(next) = stream.next().await {
println!("finished working on: {}", next);
}
}
其执行情况如下:
花几秒钟困惑地审视这个动画。值得注意的问题如下:
- 第一个
map
中的Future
似乎会在第二个map
中的Future
执行时“挂起”,反之亦然。 - 尽管期望有 3 个并发运行的
Future
,但当一个Future
完成时,流不会立即拉取新的工作单元。 - 尽管期望有 3 个并发运行的
Future
,但第一个map
必须等待第二个map
完成后才能拉取新的工作单元。
现在,看看这个替代的异步管道方法:
再困惑地审视一下这个动画,瞧——所有问题都消失了!在本文中,我将描述一种异步管道的替代方法。
为什么 futures::Stream
不是最优的?
要真正理解这种阻碍的原因,你应该深入研究 futures::stream
代码。我想说 Rust 的 futures
的惰性本质与之有关。
Rust 的 futures
本质上是惰性的。一个 Future
只有在被 await
或轮询时才会执行。因此,future::Stream
也有类似的惰性——当某个组件准备好接收时才会拉取工作。这种惰性使得流的实现有点尴尬。每个流组件要么轮询其输入,要么执行其工作,而不是两者兼顾。当一个 buffered
流操作等待其内部挂起的 Future
列表时,它不会轮询上游,这反过来又使上游处于空闲状态。
我无意贬低 futures
或 Stream
,这个 crate 和 API 是很棒的 Rust 异步基础。我将提出一种可能适用于某些用例的替代方案。
异步管道模式
我们感兴趣的用例是对某个流或数据集合进行一系列异步操作。例如,想象一个图像处理应用程序:
- 应用程序接收一个图像 URL 列表,然后:
- 发送 HTTP 请求下载图像。
- 异步处理图像。
- 将结果保存到对象存储中,例如 AWS S3。
当然,这个示例可以通过一个简单的带有一些异步调用的 for
循环来实现。别误会——这是一个完全有效的实现,而且你可能应该从这个开始。
// 有时完全有效
for url in urls {
let image = download_image(url).await;
let processed_image = process_image(image).await;
save_to_s3(processed_image).await;
}
在一些更复杂的实际用例中,你需要面对一些约束和要求:
- 并发:我们有多个核心可以利用,也有大量带宽可以利用。逐个处理每个 URL 会导致大量资源闲置,执行速度缓慢。
- 速率限制:这是并发的另一面。如果我们试图一次处理所有 URL,我们会立即达到资源上限。我们会使 CPU 核心满载,耗尽所有内存,并达到带宽上限。在我们的示例中,我们从远程服务下载图像。如果我们不控制请求速率,也可能会给它带来麻烦。实际上,我们可以说每个管道步骤都有自己的并发控制参数。例如,也许我们的图像处理对计算资源要求很高,我们希望增加处理它的 CPU 核心数量,而写入 S3 非常快,可以分配较少的 CPU 时间。
- 背压:有时管道中的一个步骤比平时花费的时间更长。当这种情况发生时,我们希望上游步骤继续产生结果。另一方面,我们应该控制这种缓冲的大小。在我们的示例中,图像很大,我们不想在内存中积累太多图像。
为了最优地满足这些约束,我们将每个步骤视为一个独立的“任务”,每个任务尽可能快地完成自己的部分,并将结果转发给下一个“任务”。如果你愿意,可以称之为“管道”。
任务和通道
本文中描述的方法利用了生成的任务和通道。任务
是积极的;它们接收的 Future
会立即运行,而不与其他 Future
或任务耦合。每个管道步骤的这种“独立性”将使我们能够以更自由的方式控制并发。
让我们从生成一个下载图像并将结果转发到输出通道的任务开始:
let (url_sender, mut url_receiver) = mpsc::channel(64);
let (image_sender, mut image_receiver) = mpsc::channel(64);
tokio::spawn(async move {
while let Some(url) = url_receiver.recv().await {
let image = download_image(url).await;
image_sender.send(image).await.unwrap(); // TODO: 处理错误!
}
});
for url in urls {
url_sender.send(url).await.unwrap();
}
while let Some(image) = image_receiver.recv().await {
println!("got image");
}
注意我们生成的任务尽可能快地下载图像,即使由于某些不可预见的原因 println!()
变得很慢。
将管道连接成管道线
在我们看到了单个管道任务之后,让我们将几个任务连接成管道。
let (url_sender, mut url_receiver) = mpsc::channel(64);
let (image_sender, mut image_receiver) = mpsc::channel(64);
let (processed_sender, mut processed_receiver) = mpsc::channel(64);
let (output_sender, mut output_receiver) = mpsc::channel(64);
tokio::spawn(async move {
while let Some(url) = url_receiver.recv().await {
let image = download_image(url).await;
image_sender.send(image).await.unwrap(); // TODO: 处理错误!
}
});
tokio::spawn(async move {
while let Some(image) = image_receiver.recv().await {
let processed_image = process_image(image).await;
processed_sender.send(processed_image).await.unwrap(); // TODO: 处理错误!
}
});
tokio::spawn(async move {
while let Some(image) = processed_receiver.recv().await {
let image_url = image.url.clone();
save_image(image).await;
output_sender.send(image_url).await.unwrap(); // TODO: 处理错误!
}
});
for url in urls {
url_sender.send(url).await.unwrap();
}
while let Some(url) = output_receiver.recv().await {
println!("done with {url}");
}
增加并发
很好,我们有 3 个任务在积极地进行处理。下载、处理和保存都是相对于彼此并发执行的,但是一次下载多个图像呢?当前代码在任务内部仍然是串行的。为了解决这个问题,让我们引入 FuturesUnordered
来增加一些并发性。
tokio::spawn(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
// 接收新的 url 进行下载
Some(url) = url_receiver.recv() => {
futures.push(download_image(url));
},
// 获取已完成下载的结果
Some(image) = futures.next() => {
image_sender.send(image).await.unwrap(); // TODO - 处理错误!
},
// 我们用完了要接收的 url 或要等待的 future。任务完成
else => break
}
}
});
限制并发也很容易。我将把它留作读者的练习(示例代码中有答案)。
背压
多疑的读者此时可能会想——我们的内存使用量不会失控吗?如果某个任务尽可能快地运行,但它的后续任务特别慢,我们会不断积累越来越多的结果。好吧,多疑的读者,请注意我在示例中使用了有界通道。我们可以积累的结果数量是有界的,并且每个任务都是独立控制的。当输出通道已满时,结果的推送会挂起,从而停止计算。
实际上,这种有界通道的使用为我们提供了一种优雅的背压解决方案。
背压允许管道处理偶尔的缓慢情况,在内存中积累有限数量的结果,避免空闲。
这是背压好处的可视化。在这个动画中,管道的第二阶段比第一阶段慢。没有背压时,一旦第一阶段达到其最大的 3 个并发 Future
,它就会空闲并等待下一步完成工作。在这个示例中,我们配置背压以允许在第二阶段工作时再处理 3 个额外的项目。注意这种变化如何减少空闲时间并使管道更快地完成处理。
错误处理
我们有了一个工作的异步管道,但像一个天真的新手一样,我们忽略了错误处理。让我们来解决这个问题。我们需要处理两种主要情况:
- 输出通道已关闭。
- 我们的一个任务发生了恐慌。
输出通道关闭
在我们的异步管道中,每个任务通过一个出站 Sender
转发其结果。如果这个 Sender
的 Receiver
被丢弃,sender.send()
将返回一个错误。如果没有人监听我们的结果,生成它们就没有意义,对吧?让我们为此情况添加处理:
tokio::spawn(async move {
while let Some(url) = url_receiver.recv().await {
let image = download_image(url).await;
if let Err(err) = image_sender.send(image).await {
break;
}
}
});
如果输出通道不再活动,我们就中断接收循环并让任务关闭。
任务恐慌
这种情况稍微复杂一些。在单线程程序中,默认情况下,当主线程恐慌时,进程会展开栈,打印出错误并退出。由于我们管道中的代码在一个 tokio
任务中,恐慌会终止该任务,但我们的主线程会继续运行。管道任务恐慌的期望结果是什么?我们应该终止进程吗?我们应该重新创建管道吗?无论如何,Rust 要求我们处理这种情况,并给我们自由来编写处理程序。
要处理一个恐慌的任务,我们需要知道任务何时终止。为此,我们将使用 spawn
的返回值——JoinHandle
。
let h1 = tokio::spawn(async move {
//...
});
let h2 = tokio::spawn(async move {
//...
});
let h3 = tokio::spawn(async move {
//...
});
// 首先消耗管道输出
while let Some(url) = output_receiver.recv().await {
println!("done with {url}");
}
// 我们是因为成功还是失败而到达这里?
match tokio::try_join!([h1, h2, h3]) {
Ok(_) => report_success(),
Err(err) => handle_error(err) // 恐慌(如果你愿意的话)
}
在完全消耗我们的管道输出通道后,我们需要确定管道是成功终止还是因为某个步骤恐慌而终止。
请注意,我们没有讨论应用程序级别的错误。每个任务都可以转发一个 Result<_, Error>
,可以像往常一样在各个管道阶段或在输出消费中进行处理。
终止
我们的管道由一系列任务构建而成。当我们完成处理时,应该如何终止这些任务呢?这是我们创建的任务 - 通道链的另一个优雅特性。我们的管道通过一个 Receiver
接收其输入。当相应的 Sender
被丢弃并且 Receiver
已被完全消耗时,while
循环将自然退出,丢弃下一个任务的输入 Sender
。这将创建一个级联的关闭任务。第一个任务将消耗其输入并退出,然后第二个任务将做同样的事情,依此类推。如果你问我的话,这相当优雅。
将概念打包到一个 crate 中
异步管道模式很棒,但相当冗长。你必须为管道的每个组件创建输入和输出通道并生成任务。我创建了 Pumps
crate,将这个概念包装成一个用户友好的 API。Pumps
基本上是异步管道模式的构建器。每个方法都会创建一个任务和一个输出通道,该通道会被转发到下一个方法的输入通道。
这是我们的示例在使用 Pumps
时的样子:
let (mut output_receiver, join_handle) = pumps::Pipeline::from_iter(urls)
.map(download_image, Concurrency::concurrent_unordered(4))
.backpressure(64)
.map(process_image, Concurrency::concurrent_unordered(8))
.map(save_to_s3, Concurrency::serial())
.build();
while let Some(url) = output_receiver.recv().await {
println!("done with {url}");
}
基准测试
我描述了一种异步管道的替代方法,但它在实践中与 Stream
相比如何呢?
为了基准测试这些库的性能,我将使用 futures::Stream
和 Pumps
定义一个类似的管道。示例管道由三个任务组成,每个任务运行一个异步函数,运行时间在 5 - 10ms 之间均匀分布。随机运行时间用于模拟任务运行时间在现实生活中的波动。任务运行时间在基准测试开始时随机选择,并传递给每个库测试。测量的运行时间是作为输入提供给管道的 1000 个元素的处理时间。
我们正在比较以下两种情况:
let mut stream = input
.map(/* 任务 1 */)
.buffer_unordered(concurrency)
.map(/* 任务 2 */)
.buffer_unordered(concurrency)
.map(/* 任务 3 */)
.buffer_unordered(concurrency);
与:
let (mut receiver, handler) = pumps::Pipeline::from_iter(input)
.map(/* 任务 1 */, Concurrency::concurrent_unordered(concurrency))
.map(/* 任务 2 */, Concurrency::concurrent_unordered(concurrency))
.map(/* 任务 3 */, Concurrency::concurrent_unordered(concurrency))
.build();
让我们看看每个任务的并发数对管道运行时间的影响:
在低并发数下,差异非常显著。然而,我们确实看到了一些收益递减的情况。
让我们看看更高的并发数:
在高并发情况下,Pumps
的优势消失了。实际上,任务和通道的开销使其成为较慢的选择。
背压的影响
如前所述,背压可以通过允许任务在下游任务变慢时继续工作来提高管道效率。为了演示这种效果,让我们对一个三步管道进行基准测试,故意在某些执行中减慢任务。这次任务运行时间从 5 - 10ms 范围内选择,但在 20% 的执行中,运行时间为 20ms。
在结果中,我们看到在低并发数下有很好的性能提升。背压的好处实际上取决于你的异步任务的运行时间分布,对于“通常快,有时慢”的场景效果最佳。
结论
我开始这项研究是为了解决我在使用 Stream
时遇到的问题,并发现了异步管道模式。这种模式利用基本的 Rust 和 Tokio 并发原语为并发执行提供了一些优雅的解决方案。这种方法,嗯,是从多线程世界借鉴来的,在那里我们只是生成普通的线程而不是 Tokio 任务,但我发现它在异步世界中也能起作用。
基准测试表明,在低并发用例中收益不错,但在高并发数下,任务和通道的开销开始累积,减少了收益。一如既往,你应该根据你的用例选择解决方案。