Rustの非同期処理はasync/awaitで記述できますが、非同期処理を含むクロージャでasync/awaitを適切に記述できないケースがあります。
たとえばコレクションの各要素に対して非同期処理を行いたい場合には、Streamを利用する必要があります。
use futures::stream::{self, StreamExt};
#[tokio::test]
async fn it_returns_ok {
let url_collection = vec![
"http://example.com/endpoints/a",
"http://example.com/endpoints/b",
"http://example.com/endpoints/c"
];
let url_stream = stream::iter(url_collection);
url_stream.for_each(|url| async move {
let res = reqwest::get(url).send().await.unwrap();
assert_eq!(res.status(), request::StatusCode::OK);
});
url_stream.await;
}
この例のハイライトは、
futures::stream::StreamExtトレイトが提供するfor_each()
メソッドを利用している点です。
そのために、stream::iter()
でコレクションをストリームに変換しています。
クロージャのmove
は、借用だとコンパイラがurl
のライフタイムに革新を失ってエラーになるためmove
しています。
なおこの例のように、ランタイムがtokio
であっても、futures
クレートを利用できます。
tokioにも
tokio_stream::StreamExtトレイトがあるのですが、メソッドのバリエーションが不足しています。
非同期のフローをStreamとして実装される方針は決まっているものの、現時点では仕様が決まっていないため、目的に合ったクレートを利用して実装することになります。
⁋ 2021/10/12↻ 2025/01/15
中馬崇尋
Chuma Takahiro
Chuma Takahiro