Rustコレクションのasyncフロー

Rustの非同期処理はasync/awaitで記述できますが、非同期処理を含むクロージャでasync/awaitを適切に記述できないケースがあります。

たとえばコレクションの各要素に対して非同期処理を行いたい場合には、Streamを利用する必要があります。

use futures::stream::{self, StreamExt};

#[tokio::test]
async fn it_returns_ok {
  let url_collection = vec![
    "http://example.com/endpoints/a",
    "http://example.com/endpoints/b",
    "http://example.com/endpoints/c"
  ];
  let url_stream = stream::iter(url_collection);

  url_stream.for_each(|url| async move {
      let res = reqwest::get(url).send().await.unwrap();
      assert_eq!(res.status(), request::StatusCode::OK);
  });
  url_stream.await;
}

この例のハイライトは、 futures::stream::StreamExtトレイトが提供するfor_each()メソッドを利用している点です。

そのために、stream::iter()でコレクションをストリームに変換しています。
クロージャのmoveは、借用だとコンパイラがurlのライフタイムに革新を失ってエラーになるためmoveしています。

なおこの例のように、ランタイムがtokioであっても、futuresクレートを利用できます。
tokioにも tokio_stream::StreamExtトレイトがあるのですが、メソッドのバリエーションが不足しています。

非同期のフローをStreamとして実装される方針は決まっているものの、現時点では仕様が決まっていないため、目的に合ったクレートを利用して実装することになります。

⁋ 2021/10/12↻ 2025/01/15
中馬崇尋
Chuma Takahiro