If you are already reading my blog, you may have noticed that I see a lot of potential in the Rust programming language. The safety and efficiency properties of Rust make it the programming language of “fearless concurrency”, and in particular the async/.await syntax that was stabilized about a year ago really make asynchronous programming a core feature of Rust.

The downside of this new asynchronous syntax is that types of various objects become more complicated, with a mix of lambdas, futures and other streams. And complex types make for complex error messages from the compiler, that can appear mysterious to the programmer at first glance. This can cause quite a bit of head-scratching even for proficient Rust programmers, as I’ve started discussing in a previous blog post.

However, after you get past this non-trivial initial learning phase, you can unlock the potential of async code in Rust!

In this series of blog posts, I want to share my experience learning how to use and compose “streams”, an asynchronous concept defined in the futures library, and soon to be added to the standard library.

We’ll go step-by-step through some case studies, looking at the various error messages from the compiler, and learning many tips along the way about how to prevent these compilation errors. Case studies are of course simplified examples, but hopefully relate to many asynchronous problems involving streams.

You’ll get the most of this blog post if you already have some knowledge of Rust, but you’re not expected to be familiar with the specifics of async programming in Rust.

The full code examples are available on this GitHub repository. You can run each example (or obtain a compilation error) with cargo run --example <example name>. At the time of writing, I’ve tested them with Rust version 1.50.0.

$ cargo --version
cargo 1.50.0 (f04e7fab7 2021-02-04)

Case study: flattening paginated search results

In this case study, we will consider the problem of processing paginated search results. Typically, we want to process a stream of requests, each request being of the form fetch_resource(id). However, we don’t know in advance the IDs of resources to fetch - for this we will first have to run some search query which will give us a list of IDs.

The “trick” is that the search query can yield an unbounded number of results (for example, sorted by relevance), so in practice the search results will be paginated. For example search(query, page=0) gives the first 10 IDs, search(query, page=1) gives the next 10 IDs, and so on.

We consider that both searching for IDs and fetching a resource are asynchronous operations - typically we query some API over the network. Overall, this paginated search pattern can occur in many kinds of APIs.

Hello, async world!

In this blog post, we’ll be using the Tokio asynchronous runtime, together with the futures library. Besides, we will simulate asynchronous operations - such as network requests - with Tokio’s sleep function, each time with a random delay. For this setup, we will therefore use the following dependencies:

  • tokio, an asynchronous runtime for Rust. We’ll use the following features: macros which provides the tokio::main decorator, rt-multi-thread for a multi-threaded runtime under the hood, and time which provides the asynchronous sleep function.
  • futures, which provides useful tools for asynchronous programming, such as streams.
  • rand, to sample a random duration to simulate each asynchronous operation.
  • lazy_static, as a convenience to measure time elapsed from a global starting instant across functions.

At the time of writing, I’ve used the following versions as shown in the Cargo.toml file.

[dependencies]
futures = "0.3.13"
lazy_static = "1.4.0"
rand = "0.8.3"
tokio = { version = "1.4.0", features = ["macros", "rt-multi-thread", "time"] }

Our Hello World program will be the following. We define an asynchronous get_page function, which simply returns 10 sequential IDs for each page, but first sleeps for a random duration to simulate an asynchronous request in the background (network request, database query, etc.).

use lazy_static::lazy_static;
use rand::distributions::{Distribution, Uniform};
use std::time::Duration;
use tokio::time::{sleep, Instant};

lazy_static! {
    static ref START_TIME: Instant = Instant::now();
}

#[tokio::main]
async fn main() {
    let page = get_page(42).await;
    println!("Page #42: {:?}", page);
}

async fn get_page(i: usize) -> Vec<usize> {
    let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
    println!(
        "[{}] # get_page({}) will complete in {} ms",
        START_TIME.elapsed().as_millis(),
        i,
        millis
    );

    sleep(Duration::from_millis(millis)).await;
    println!(
        "[{}] # get_page({}) completed",
        START_TIME.elapsed().as_millis(),
        i
    );

    (10 * i..10 * (i + 1)).collect()
}

Here is a sample output. You can note that the sleep time is not 100% accurate, here it took an extra millisecond than what was requested.

$ cargo run --release --example 01-pages-hello-ok
[0] # get_page(42) will complete in 5 ms
[6] # get_page(42) completed
Page #42: [420, 421, 422, 423, 424, 425, 426, 427, 428, 429]

Primer: creating a stream of pages

Our next step will be to define a stream of page results. The stream concept is defined in the futures library, and is the asynchronous version of an iterator, i.e. each value in the stream in produced asynchronously.

For now, we will define a get_pages function, which produces a stream of search results as a list of IDs, without actually querying the underlying resources. This stream will be infinite, but we can then query the first N pages of it with the take method, and collect the results into a vector via the collect method.

#[tokio::main]
async fn main() {
    println!("First 10 pages:\n{:?}", get_n_pages(10).await);
}

async fn get_n_pages(n: usize) -> Vec<Vec<usize>> {
    get_pages().take(n).collect().await
}

fn get_pages() -> impl Stream<Item = Vec<usize>> {
    todo!()
}

We are leveraging the impl Trait syntax for the return type of get_pages. This is very useful for various reasons.

  1. From an API point of view, we don’t care about the implementation details, what matters is that the result type implements the Stream trait. The impl Trait syntax is a concise and readable way of expressing that.
  2. We don’t need to update the function signature if we change the underlying implementation. Indeed, adding any of the stream combinator methods changes the resulting type, even if the important part - the Item type - stays the same.
  3. Some types are very difficult to spell otherwise, in particular if lambdas are involved.

So how do we build a stream for the get_pages function? We can first generate a stream of integers 0, 1, 2, ... (the page numbers), and then call get_page on each of them.

Generating the integers can be done by converting an iterator over the 0.. range into a stream via the stream::iter function. We can then try to use the map method to call get_page on each page number.

// First attempt.
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).map(|i| get_page(i))
}

However, this yields the following somewhat obscure error.

error[E0271]: type mismatch resolving `<[closure@examples/02-pages-stream-1-fail.rs:21:27: 21:42] as FnOnce<(usize,)>>::Output == Vec<usize>`
  --> examples/02-pages-stream-1-fail.rs:20:19
   |
20 | fn get_pages() -> impl Stream<Item = Vec<usize>> {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected opaque type, found struct `Vec`
...
24 | async fn get_page(i: usize) -> Vec<usize> {
   |                                ---------- the `Output` of this `async fn`'s expected opaque type
   |
   = note: expected opaque type `impl futures::Future`
                   found struct `Vec<usize>`
   = note: required because of the requirements on the impl of `futures_util::fns::FnOnce1<usize>` for `[closure@examples/02-pages-stream-1-fail.rs:21:27: 21:42]`
help: consider `await`ing on the `Future`
   |
20 | fn get_pages() -> impl Stream<Item = Vec<usize>>.await {
   |                                                 ^^^^^^

The “help” part of the error message doesn’t help us in this case, one cannot append .await to a type…

Somehow, the lambda that we pass to the map method expects a future, maybe wrapping it into an async block would help?

// Second attempt.
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).map(|i| async { get_page(i) })
}

No, this still doesn’t work.

error[E0271]: type mismatch resolving `<[closure@examples/03-pages-stream-2-fail.rs:21:27: 21:52] as FnOnce<(usize,)>>::Output == Vec<usize>`
  --> examples/03-pages-stream-2-fail.rs:20:19
   |
20 | fn get_pages() -> impl Stream<Item = Vec<usize>> {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected opaque type, found struct `Vec`
21 |     stream::iter(0..).map(|i| async { get_page(i) })
   |                                     --------------- the expected generator
...
24 | async fn get_page(i: usize) -> Vec<usize> {
   |                                ---------- the `Output` of this `async fn`'s one of the expected opaque types
   |
   = note: expected opaque type `impl futures::Future`
                   found struct `Vec<usize>`
   = note: required because of the requirements on the impl of `futures_util::fns::FnOnce1<usize>` for `[closure@examples/03-pages-stream-2-fail.rs:21:27: 21:52]`

The error is quite similar, but the async block is highlighted as “the expected generator”. Maybe we need to .await inside of it?

// Third attempt.
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).map(|i| async { get_page(i).await })
}

We didn’t make much progress…

error[E0271]: type mismatch resolving `<[closure@examples/04-pages-stream-3-fail.rs:21:27: 21:58] as FnOnce<(usize,)>>::Output == Vec<usize>`
  --> examples/04-pages-stream-3-fail.rs:20:19
   |
20 | fn get_pages() -> impl Stream<Item = Vec<usize>> {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected opaque type, found struct `Vec`
21 |     stream::iter(0..).map(|i| async { get_page(i).await })
   |                                     --------------------- the expected generator
   |
   = note: expected opaque type `impl futures::Future`
                   found struct `Vec<usize>`
   = note: required because of the requirements on the impl of `futures_util::fns::FnOnce1<usize>` for `[closure@examples/04-pages-stream-3-fail.rs:21:27: 21:58]`

After looking a bit further through the documentation, there is also a then method. Let’s try that!

// Fourth attempt.
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).then(|i| get_page(i))
}

Yes, this compiles :)

[0] # get_page(0) will complete in 7 ms
[8] # get_page(0) completed
[8] # get_page(1) will complete in 7 ms
[16] # get_page(1) completed
[16] # get_page(2) will complete in 1 ms
[18] # get_page(2) completed
[19] # get_page(3) will complete in 8 ms
[28] # get_page(3) completed
[28] # get_page(4) will complete in 1 ms
[30] # get_page(4) completed
[30] # get_page(5) will complete in 7 ms
[39] # get_page(5) completed
[39] # get_page(6) will complete in 8 ms
[48] # get_page(6) completed
[48] # get_page(7) will complete in 1 ms
[50] # get_page(7) completed
[50] # get_page(8) will complete in 2 ms
[54] # get_page(8) completed
[54] # get_page(9) will complete in 4 ms
[59] # get_page(9) completed
First 10 pages:
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]

I’ve also made a simple plotting tool to display a timeline of the requests, thanks to the plotters library.

SVG

So what happened here? If we look in terms of function signatures, combining definitions of the StreamExt trait with the Stream trait implementation for the Then struct and the Map struct, we essentially have the following types.

fn map<T, F>(self, f: F) -> impl Stream<Item = T>
where
    F: FnMut(Self::Item) -> T;

fn then<Fut, F>(self, f: F) -> impl Stream<Item = <Fut as Future>::Output>
where
    F: FnMut(Self::Item) -> Fut,
    Fut: Future;

We can learn the following from our example: the get_page function returns a Future – because it’s an async function – whose Output is Vec<usize> – that’s simply the output of the async function. In other words, get_page implements FnMut(usize) -> impl Future<Output = Vec<usize>>.

When we call .then(), we therefore obtain a stream whose items are Vec<usize>, whereas if we call .map() we obtain a stream of Futures.

Note that if we wrap our function call in another async block with the .then() method, we obtain an error again.

// A fifth attempt for the road.
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).then(|i| async { get_page(i) })
}
error[E0271]: type mismatch resolving `<futures::stream::Then<futures::stream::Iter<RangeFrom<usize>>, impl futures::Future, [closure@examples/06-pages-stream-5-fail.rs:21:28: 21:53]> as Stream>::Item == Vec<usize>`
  --> examples/06-pages-stream-5-fail.rs:20:19
   |
20 | fn get_pages() -> impl Stream<Item = Vec<usize>> {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected opaque type, found struct `Vec`
...
24 | async fn get_page(i: usize) -> Vec<usize> {
   |                                ---------- the `Output` of this `async fn`'s expected opaque type
   |
   = note: expected opaque type `impl futures::Future`
                   found struct `Vec<usize>`
help: consider `await`ing on the `Future`
   |
20 | fn get_pages() -> impl Stream<Item = Vec<usize>>.await {
   |                                                 ^^^^^^

However, we can fix this by .awaiting inside of the async block.

// Sixth attempt.
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).then(|i| async { get_page(i).await })
}

Well, not quite yet…

error[E0373]: async block may outlive the current function, but it borrows `i`, which is owned by the current function
  --> examples/07-pages-stream-6-fail.rs:21:38
   |
21 |     stream::iter(0..).then(|i| async { get_page(i).await })
   |                                      ^^^^^^^^^^^-^^^^^^^^^
   |                                      |          |
   |                                      |          `i` is borrowed here
   |                                      may outlive borrowed value `i`
   |
note: async block is returned here
  --> examples/07-pages-stream-6-fail.rs:21:32
   |
21 |     stream::iter(0..).then(|i| async { get_page(i).await })
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: to force the async block to take ownership of `i` (and any other referenced variables), use the `move` keyword
   |
21 |     stream::iter(0..).then(|i| async move { get_page(i).await })
   |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^

However, this time the error message is helpful and suggests to add the move keyword to take ownership of the i parameter… but we are simply using a more verbose way of calling get_page directly.

// The following two functions are equivalent!
fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).then(|i| async move { get_page(i).await })
}

fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).then(|i| get_page(i))
}

In Rust, the async keyword creates a Future whereas the .await keyword destroys it. Therefore, they cancel each other and async { foo.await } is equivalent to foo.

Decreasing latency via buffering

As a reminder, the function that we implemented in the previous section generates a stream of requests, but they are run sequentially. This can be problematic if the latency for each request is quite high, for example in the order of 100ms or more for an actual request over the Internet.

fn get_pages() -> impl Stream<Item = Vec<usize>> {
    stream::iter(0..).then(|i| get_page(i))
}
[0] # get_page(0) will complete in 7 ms
[8] # get_page(0) completed
[8] # get_page(1) will complete in 7 ms
[16] # get_page(1) completed
[16] # get_page(2) will complete in 1 ms
[18] # get_page(2) completed
...

We could try to make all of the requests at the same time, but that’s usually not pragmatic – for example the API we are calling may be rate limited. Fortunately, the stream API supports several buffering combinators, that allow to query multiple items of the original stream in parallel, and then yield the results. These combinators are parameterized by what I’ll call the buffering factor: the maximum number of queries that can be buffered at any point in time.

There are two flavors of stream buffering.

  • Ordered buffering: multiple requests are run in parallel, but the results are kept in the original order of the stream.
  • Unordered buffering: multiple requests are run in parallel, and the results are yielded in a “first come first served” fashion, i.e. items that resolve first are yielded first.

For a given buffering factor, unordered buffering will result in a lower total latency of running all of the requests, compared to ordered buffering. So if you don’t care about the order of results, you should use unordered buffering. Otherwise, ordered buffering can still decrease the total latency by a noticeable amount in comparison with no buffering at all.

#[tokio::main]
async fn main() {
    println!(
        "First 10 pages, buffered by 5:\n{:?}",
        get_n_pages_buffered(10, 5).await
    );
}

async fn get_n_pages_buffered(n: usize, buf_factor: usize) -> Vec<Vec<usize>> {
    get_pages_buffered(buf_factor).take(n).collect().await
}

fn get_pages_buffered(buf_factor: usize) -> impl Stream<Item = Vec<usize>> {
    todo!()
}

Ordered buffering

Let’s start with ordered buffering. This is done with the buffered method, which simply takes the buffering factor as parameter. As a first attempt, let’s try to apply it to our previous get_pages() stream.

// First attempt.
fn get_pages_buffered(buf_factor: usize) -> impl Stream<Item = Vec<usize>> {
    get_pages().buffered(buf_factor)
}

This unfortunately doesn’t work.

error[E0277]: `Vec<usize>` is not a future
  --> examples/09-pages-buffered-1-fail.rs:24:17
   |
24 |     get_pages().buffered(buf_factor)
   |                 ^^^^^^^^ `Vec<usize>` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `Vec<usize>`

error[E0277]: `Vec<usize>` is not a future
  --> examples/09-pages-buffered-1-fail.rs:23:45
   |
23 | fn get_pages_buffered(buf_factor: usize) -> impl Stream<Item = Vec<usize>> {
   |                                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `Vec<usize>` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `Vec<usize>`
   = note: required because of the requirements on the impl of `Stream` for `Buffered<impl Stream>`

The error message seems somewhat familiar, although still not completely clear. Let’s take a look at the signature of the buffered function and the resulting Buffered struct.

fn buffered(self, n: usize) -> impl Stream<Item = <Self::Item as Future>::Output>
where
    Self::Item: Future;

Here we can see that buffered expects to operate on a stream of Futures, and will yield a stream of the Future’s Output. However, our get_pages function returns an object implementing Stream<Item = Vec<usize>>. So we would like to instead have a get_pages_futures function, returning a Stream<Item = impl Future<Output = Vec<usize>>>.

If we look back at our attempts in the previous section, this was precisely the difference between the then and map combinators.

// Second attempt.
fn get_pages_buffered(buf_factor: usize) -> impl Stream<Item = Vec<usize>> {
    get_pages_futures().buffered(buf_factor)
}

fn get_pages_futures() -> impl Stream<Item = impl Future<Output = Vec<usize>>> {
    stream::iter(0..).map(|i| get_page(i))
}

And this works :)

[0] # get_page(0) will complete in 0 ms
[0] # get_page(1) will complete in 4 ms
[0] # get_page(2) will complete in 9 ms
[0] # get_page(3) will complete in 1 ms
[0] # get_page(4) will complete in 6 ms
[1] # get_page(0) completed
[1] # get_page(5) will complete in 7 ms
[2] # get_page(3) completed
[5] # get_page(1) completed
[5] # get_page(6) will complete in 0 ms
[6] # get_page(6) completed
[7] # get_page(4) completed
[9] # get_page(5) completed
[10] # get_page(2) completed
[10] # get_page(7) will complete in 1 ms
[10] # get_page(8) will complete in 1 ms
[10] # get_page(9) will complete in 4 ms
[10] # get_page(10) will complete in 6 ms
[10] # get_page(11) will complete in 8 ms
[12] # get_page(7) completed
[12] # get_page(8) completed
[12] # get_page(12) will complete in 4 ms
[12] # get_page(13) will complete in 0 ms
[13] # get_page(13) completed
[16] # get_page(9) completed
First 10 pages, buffered by 5:
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]

SVG

As you can see, the results are collected in order, as expected.

In terms of buffering, the first 5 requests are triggered immediately in the beginning. Then, results are buffered, and new requests are ultimately only spawn up when the first request of the current “buffering window” is completed. For example, get_page(6) is only triggered when get_page(1) completes, even if get_page(3) already completed before that.

Batches of multiple requests can also be triggered together. For example, once get_page(2) completes, this “unlocks” all requests until 6 (that already completed), and therefore requests 7 to 11 are triggered immediately at that point.

Also, because we used the buffered combinator before take-ing the first 10, some unnecessary requests are triggered - in this example for pages 10 to 13. A more efficient method would be to swap the take and buffered calls.

// Chain `take()` before `buffered()` to avoid wasting requests.
async fn get_n_pages_buffered(n: usize, buf_factor: usize) -> Vec<Vec<usize>> {
    get_pages_futures()
        .take(n)
        .buffered(buf_factor)
        .collect()
        .await
}
[0] # get_page(0) will complete in 0 ms
[0] # get_page(1) will complete in 7 ms
[0] # get_page(2) will complete in 6 ms
[0] # get_page(3) will complete in 7 ms
[0] # get_page(4) will complete in 9 ms
[1] # get_page(0) completed
[1] # get_page(5) will complete in 3 ms
[5] # get_page(5) completed
[7] # get_page(2) completed
[8] # get_page(1) completed
[8] # get_page(3) completed
[8] # get_page(6) will complete in 7 ms
[8] # get_page(7) will complete in 7 ms
[8] # get_page(8) will complete in 5 ms
[10] # get_page(4) completed
[10] # get_page(9) will complete in 3 ms
[14] # get_page(8) completed
[15] # get_page(9) completed
[16] # get_page(6) completed
[16] # get_page(7) completed
First 10 pages, buffered by 5:
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]

SVG

If you plan to process a limited number of requests, make sure to buffer these requests after taking the first N.

Unordered buffering

As mentioned before, the buffer_unordered method will buffer requests but yield results as soon as they arrive. Apart from that it can fortunately be used in the exact same way as buffered.

In this case, there will again be a difference depending on whether we buffer or take the requests first.

async fn get_n_pages_buffer_unordered(n: usize, buf_factor: usize) -> Vec<Vec<usize>> {
    get_pages_futures()
        .buffer_unordered(buf_factor)
        .take(n)
        .collect()
        .await
}
[0] # get_page(0) will complete in 0 ms
[0] # get_page(1) will complete in 2 ms
[0] # get_page(2) will complete in 2 ms
[0] # get_page(3) will complete in 0 ms
[0] # get_page(4) will complete in 8 ms
[1] # get_page(0) completed
[1] # get_page(3) completed
[1] # get_page(5) will complete in 7 ms
[1] # get_page(6) will complete in 3 ms
[3] # get_page(1) completed
[3] # get_page(2) completed
[3] # get_page(7) will complete in 0 ms
[3] # get_page(8) will complete in 1 ms
[4] # get_page(7) completed
[4] # get_page(9) will complete in 6 ms
[6] # get_page(6) completed
[6] # get_page(8) completed
[6] # get_page(10) will complete in 4 ms
[6] # get_page(11) will complete in 2 ms
[9] # get_page(4) completed
[9] # get_page(5) completed
[9] # get_page(11) completed
First 10 pages, buffer-unordered by 5:
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [110, 111, 112, 113, 114, 115, 116, 117, 118, 119]]

SVG

As you can see, when we buffer before taking:

  • the results are returned out-of-order,
  • as soon as a request completes, a new one is triggered,
  • consequently, the first N results are not necessarily the first N pages - in this example we have the results for page 11 but not for page 9.

On the other hand, if we take before buffering, we really obtain the first N results, but in arbitrary order.

// Chain `take()` before `buffer_unordered()` to make sure the first N requests are returned.
async fn get_n_pages_buffer_unordered(n: usize, buf_factor: usize) -> Vec<Vec<usize>> {
    get_pages_futures()
        .take(n)
        .buffer_unordered(buf_factor)
        .collect()
        .await
}
[0] # get_page(0) will complete in 5 ms
[0] # get_page(1) will complete in 8 ms
[0] # get_page(2) will complete in 4 ms
[0] # get_page(3) will complete in 7 ms
[0] # get_page(4) will complete in 2 ms
[3] # get_page(4) completed
[3] # get_page(5) will complete in 2 ms
[5] # get_page(2) completed
[5] # get_page(6) will complete in 7 ms
[6] # get_page(0) completed
[6] # get_page(5) completed
[6] # get_page(7) will complete in 6 ms
[6] # get_page(8) will complete in 5 ms
[8] # get_page(3) completed
[8] # get_page(1) completed
[8] # get_page(9) will complete in 0 ms
[9] # get_page(9) completed
[13] # get_page(6) completed
[13] # get_page(8) completed
[14] # get_page(7) completed
First 10 pages, buffer-unordered by 5:
[[40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]]

SVG

If you want to make sure the first N requests are processed, even out-of-order, make sure to buffer these requests after taking the first N.

Flattening and chaining requests

The last part of this case study is to actually fetch the resources for the IDs from the search results. Similarly to the get_page function, we will define a fetch_resource asynchronous function that simulates an asynchronous request in the background. We also define a dummy Resource type, which for simplicity just records the ID of the resource.

#[derive(Clone, Copy)]
struct Resource(usize);

impl std::fmt::Debug for Resource {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_fmt(format_args!("r:{}", self.0))
    }
}

async fn fetch_resource(i: usize) -> Resource {
    let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
    println!(
        "[{}] ## fetch_resource({}) will complete in {} ms",
        START_TIME.elapsed().as_millis(),
        i,
        millis
    );

    sleep(Duration::from_millis(millis)).await;
    println!(
        "[{}] ## fetch_resource({}) completed",
        START_TIME.elapsed().as_millis(),
        i
    );
    Resource(i)
}

Our goal will be to define a function that returns impl Stream<Item = Resource>. So far, we had functions returning impl Stream<Item = Vec<usize>>, where each item is a page containing multiple IDs. The first step will be to obtain an impl Stream<Item = usize>, that is a stream of IDs without the concept of pages anymore.

This can be done quite simply with the flat_map method, together with stream::iter. There isn’t much complexity this time, but a nice trick is that .flat_map(f) is a shortcut over .map(f).flatten(). We’ll define three functions returning streams of IDs, respectively without buffering and with ordered or unordered buffering of the pages. In all cases, we will take N pages from the start, as discussed in the previous section.

To make the outputs more readable for the rest of this post, I’ve reduced the number of items per page from 10 to 5.

#[tokio::main]
async fn main() {
    println!(
        "IDs from first 5 pages:\n{:?}",
        get_ids_n_pages(5).collect::<Vec<_>>().await
    );
    println!(
        "IDs from first 5 pages, buffered by 3:\n{:?}",
        get_ids_n_pages_buffered(5, 3).collect::<Vec<_>>().await
    );
    println!(
        "IDs from first 5 pages, buffer-unordered by 3:\n{:?}",
        get_ids_n_pages_buffer_unordered(5, 3)
            .collect::<Vec<_>>()
            .await
    );
}

fn get_ids_n_pages(n: usize) -> impl Stream<Item = usize> {
    get_pages().take(n).flat_map(|page| stream::iter(page))
}

fn get_ids_n_pages_buffered(n: usize, buf_factor: usize) -> impl Stream<Item = usize> {
    get_pages_futures()
        .take(n)
        .buffered(buf_factor)
        .flat_map(|page| stream::iter(page))
}

fn get_ids_n_pages_buffer_unordered(n: usize, buf_factor: usize) -> impl Stream<Item = usize> {
    get_pages_futures()
        .take(n)
        .buffer_unordered(buf_factor)
        .flat_map(|page| stream::iter(page))
}

There isn’t much surprise in terms of interleaving of the get_page functions for the buffered variants. We can simply see that results are unordered (but still ordered within a page) in the buffer_unordered case.

[0] # get_page(0) will complete in 7 ms
...
[37] # get_page(4) completed
IDs from first 5 pages:
[0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 40, 41, 42, 43, 44]
[37] # get_page(0) will complete in 8 ms
...
[55] # get_page(3) completed
IDs from first 5 pages, buffered by 3:
[0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 40, 41, 42, 43, 44]
[55] # get_page(0) will complete in 1 ms
...
[67] # get_page(4) completed
IDs from first 5 pages, buffer-unordered by 3:
[0, 1, 2, 3, 4, 20, 21, 22, 23, 24, 10, 11, 12, 13, 14, 30, 31, 32, 33, 34, 40, 41, 42, 43, 44]

A variant could be to request the first N items, without knowing in advance how many pages that will be. In that case, we can only apply the take method at the end, and therefore we cannot use unordered buffering (otherwise there is no guarantee that we obtain the first N items).

#[tokio::main]
async fn main() {
    println!(
        "IDs from first 25 items, buffered by 3 pages:\n{:?}",
        get_ids_n_items_buffered(25, 3).collect::<Vec<_>>().await
    );
}

fn get_ids_n_items_buffered(n: usize, buf_factor: usize) -> impl Stream<Item = usize> {
    get_pages_futures()
        .buffered(buf_factor)
        .flat_map(|page| stream::iter(page))
        .take(n)
}

In this case, a few extra pages may be requested before the stream of IDs overall completes.

[0] # get_page(0) will complete in 1 ms
[0] # get_page(1) will complete in 7 ms
[0] # get_page(2) will complete in 1 ms
[2] # get_page(0) completed
[2] # get_page(2) completed
[2] # get_page(3) will complete in 3 ms
[7] # get_page(3) completed
[8] # get_page(1) completed
[8] # get_page(4) will complete in 4 ms
[8] # get_page(5) will complete in 5 ms
[8] # get_page(6) will complete in 2 ms
[11] # get_page(6) completed
[13] # get_page(4) completed
IDs from first 25 items, buffered by 3 pages:
[0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 40, 41, 42, 43, 44]

SVG

The last step is to convert a stream of IDs into a stream of resources, via the fetch_resource function. It will be essentially the same operation as we did in the beginning to convert stream::iter(0..) into a stream of pages. Here again, we can either fetch resources sequentially, or buffer them (ordered or not).

In the following code, I’ll use the same buffering for fetching pages and resources, but both are totally decoupled.

No buffering

Now that we’ve overcome compilation errors in the previous section, the code relatively straightforward. We simply chain .then() with the fetch_resource function on the stream of resource IDs.

async fn collect_resources_n_pages(n: usize) -> Vec<Resource> {
    get_ids_n_pages(n)
        .then(|id| fetch_resource(id))
        .collect()
        .await
}

// Expanded version.
async fn collect_resources_n_pages(n: usize) -> Vec<Resource> {
    stream::iter(0..)
        .then(|i| get_page(i))
        .take(n)
        .flat_map(|page| stream::iter(page))
        .then(|id| fetch_resource(id))
        .collect()
        .await
}

Without buffering, the output shows that everything is sequential, as expected.

SVG

Ordered buffering

To buffer the requests, we replace .then() by .map().buffered().

async fn collect_resources_n_pages_buffered(n: usize, buf_factor: usize) -> Vec<Resource> {
    get_ids_n_pages_buffered(n, buf_factor)
        .map(|id| fetch_resource(id))
        .buffered(buf_factor)
        .collect()
        .await
}

// Expanded version.
async fn collect_resources_n_pages_buffered(n: usize, buf_factor: usize) -> Vec<Resource> {
    stream::iter(0..)
        .map(|i| get_page(i))
        .take(n)
        .buffered(buf_factor)
        .flat_map(|page| stream::iter(page))
        .map(|id| fetch_resource(id))
        .buffered(buf_factor)
        .collect()
        .await
}

A simple ordered buffering helps to parallelize fetching of resources. Note that even if pages are buffered as well, it doesn’t help much, because resources are still requested more or less in order - up to the buffering window of 3 resources.

However, for this example we modeled get_page and fetch_resource to have similar latency characteristics. If getting a page is much slower than fetching a resource, buffering of pages would definitely help.

SVG

Unordered buffering

Here we simply replace .buffered() by .buffer_unordered().

async fn collect_resources_n_pages_buffer_unordered(n: usize, buf_factor: usize) -> Vec<Resource> {
    get_ids_n_pages_buffer_unordered(n, buf_factor)
        .map(|id| fetch_resource(id))
        .buffer_unordered(buf_factor)
        .collect()
        .await
}

// Expanded version.
async fn collect_resources_n_pages_buffer_unordered(n: usize, buf_factor: usize) -> Vec<Resource> {
    stream::iter(0..)
        .map(|i| get_page(i))
        .take(n)
        .buffer_unordered(buf_factor)
        .flat_map(|page| stream::iter(page))
        .map(|id| fetch_resource(id))
        .buffer_unordered(buf_factor)
        .collect()
        .await
}

In the buffer-unordered case, we see less ordering (as expected). However, there is still some structure, as all the resources in a given page are requested before the next available page. But of course the order in which resources complete is arbitrary.

SVG

[211] # get_page(0) will complete in 6 ms
[211] # get_page(1) will complete in 1 ms
[211] # get_page(2) will complete in 5 ms
[213] # get_page(1) completed
[213] ## fetch_resource(10) will complete in 1 ms
[213] ## fetch_resource(11) will complete in 6 ms
[213] ## fetch_resource(12) will complete in 0 ms
[214] ## fetch_resource(10) completed
[215] ## fetch_resource(12) completed
[215] ## fetch_resource(13) will complete in 9 ms
[215] ## fetch_resource(14) will complete in 1 ms
[217] ## fetch_resource(14) completed
[217] # get_page(2) completed
[217] ## fetch_resource(20) will complete in 0 ms
[218] ## fetch_resource(20) completed
[218] ## fetch_resource(21) will complete in 5 ms
[221] ## fetch_resource(11) completed
[221] ## fetch_resource(22) will complete in 0 ms
[222] ## fetch_resource(22) completed
[222] ## fetch_resource(23) will complete in 7 ms
[226] ## fetch_resource(13) completed
[226] ## fetch_resource(21) completed
[226] # get_page(3) will complete in 9 ms
[226] # get_page(0) completed
[226] ## fetch_resource(24) will complete in 6 ms
[226] ## fetch_resource(0) will complete in 3 ms
[230] ## fetch_resource(23) completed
[230] ## fetch_resource(0) completed
[230] ## fetch_resource(1) will complete in 5 ms
[230] ## fetch_resource(2) will complete in 9 ms
[234] ## fetch_resource(24) completed
[234] ## fetch_resource(3) will complete in 4 ms
[236] ## fetch_resource(1) completed
[236] ## fetch_resource(4) will complete in 9 ms
[239] ## fetch_resource(3) completed
[239] # get_page(4) will complete in 7 ms
[239] # get_page(3) completed
[239] ## fetch_resource(30) will complete in 0 ms
[241] ## fetch_resource(2) completed
[241] ## fetch_resource(30) completed
[241] ## fetch_resource(31) will complete in 4 ms
[241] ## fetch_resource(32) will complete in 3 ms
[245] ## fetch_resource(32) completed
[245] ## fetch_resource(33) will complete in 4 ms
[246] ## fetch_resource(4) completed
[246] ## fetch_resource(31) completed
[246] ## fetch_resource(34) will complete in 7 ms
[248] # get_page(4) completed
[248] ## fetch_resource(40) will complete in 2 ms
[251] ## fetch_resource(33) completed
[251] ## fetch_resource(40) completed
[251] ## fetch_resource(41) will complete in 8 ms
[251] ## fetch_resource(42) will complete in 0 ms
[252] ## fetch_resource(42) completed
[252] ## fetch_resource(43) will complete in 0 ms
[254] ## fetch_resource(43) completed
[254] ## fetch_resource(44) will complete in 4 ms
[255] ## fetch_resource(34) completed
[259] ## fetch_resource(44) completed
[260] ## fetch_resource(41) completed
Resources from first 5 pages, buffer-unordered by 3:
[r:10, r:12, r:14, r:20, r:11, r:22, r:13, r:21, r:23, r:0, r:24, r:1, r:3, r:2, r:30, r:32, r:4, r:31, r:33, r:40, r:42, r:43, r:34, r:44, r:41]

If we look at the detailed output, we can also observe an interesting property of futures: futures are lazy, i.e. a future only really completes when the executor decides to poll it. For example, in this example execution trace, we can see that even though get_page(0) is supposed to complete within 6 milliseconds, it only completes after 25 milliseconds, more precisely when the executor has already requested all resources from pages 1 and 2 and needs more resource IDs to fetch.

So like in the ordered case, buffering of the pages mostly helps if fetching a resource is much faster than fetching a page.

For more details about this lazy behavior of futures, and what executor and polling mean, I encourage you to take a look at the documentation of the std::future::Future trait, as well as the relevant chapter from the Asynchronous Programming in Rust book.

Too many obscure error messages? Define small functions!

If you’ve looked at our approach so far, we have defined many small functions to transform some stream into another. But in general, you may wonder what’s the point of splitting our code into so many functions when the main logic can be written as a single chain of stream combinators.

The main issue is that when futures and lambdas are involved, a long chain can lead to very complex error messages if you make a mistake, such as using .map() instead of .then() (and vice-versa). This is because the types become more complicated on a longer chain, and therefore the compiler outputs “unreadable” error messages with these complex types in that case.

Here are a few examples.

The first example is the collect_resources_n_pages function in its “factored” form. If we use map instead of then, we get the following error message.

async fn collect_resources_n_pages(n: usize) -> Vec<Resource> {
    // Used `map` instead of `then`.
    get_ids_n_pages(n)
        .map(|id| fetch_resource(id))
        .collect()
        .await
}

We can somewhat understand from this error message that we are trying to collect Futures instead of Resources.

error[E0277]: the trait bound `Vec<Resource>: Extend<impl futures::Future>` is not satisfied
  --> examples/18-pages-resources-map-insteadof-then-fail.rs:20:5
   |
20 | /     get_ids_n_pages(n)
21 | |         .map(|id| fetch_resource(id))
22 | |         .collect()
23 | |         .await
   | |______________^ the trait `Extend<impl futures::Future>` is not implemented for `Vec<Resource>`
   |
   = help: the following implementations were found:
             <Vec<T, A> as Extend<&'a T>>
             <Vec<T, A> as Extend<T>>
   = note: required because of the requirements on the impl of `futures::Future` for `Collect<futures::stream::Map<impl Stream, [closure@examples/18-pages-resources-map-insteadof-then-fail.rs:21:14: 21:37]>, Vec<Resource>>`
   = note: required by `futures::Future::poll`

error[E0277]: the trait bound `Vec<Resource>: Extend<impl futures::Future>` is not satisfied
  --> examples/18-pages-resources-map-insteadof-then-fail.rs:22:10
   |
22 |         .collect()
   |          ^^^^^^^ the trait `Extend<impl futures::Future>` is not implemented for `Vec<Resource>`
   |
   = help: the following implementations were found:
             <Vec<T, A> as Extend<&'a T>>
             <Vec<T, A> as Extend<T>>

Let’s make the same mistake on the expanded version of the function.

async fn collect_resources_n_pages(n: usize) -> Vec<Resource> {
    stream::iter(0..)
        .then(|i| get_page(i))
        .take(n)
        .flat_map(|page| stream::iter(page))
        // Used `map` instead of `then` here.
        .map(|id| fetch_resource(id))
        .collect()
        .await
}

The error message is somewhat similar so you may be able to get away with it, but the type in the “note” part is much longer, and crucially it’s much more difficult to pin-point which line in the chain is causing the error

error[E0277]: the trait bound `Vec<Resource>: Extend<impl futures::Future>` is not satisfied
  --> examples/19-pages-resources-map-insteadof-then-expanded-fail.rs:20:5
   |
20 | /     stream::iter(0..)
21 | |         .then(|i| get_page(i))
22 | |         .take(n)
23 | |         .flat_map(|page| stream::iter(page))
24 | |         .map(|id| fetch_resource(id))
25 | |         .collect()
26 | |         .await
   | |______________^ the trait `Extend<impl futures::Future>` is not implemented for `Vec<Resource>`
   |
   = help: the following implementations were found:
             <Vec<T, A> as Extend<&'a T>>
             <Vec<T, A> as Extend<T>>
   = note: required because of the requirements on the impl of `futures::Future` for `Collect<futures::stream::Map<futures::stream::FlatMap<futures::stream::Take<futures::stream::Then<futures::stream::Iter<RangeFrom<usize>>, impl futures::Future, [closure@examples/19-pages-resources-map-insteadof-then-expanded-fail.rs:21:15: 21:30]>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/19-pages-resources-map-insteadof-then-expanded-fail.rs:23:19: 23:44]>, [closure@examples/19-pages-resources-map-insteadof-then-expanded-fail.rs:24:14: 24:37]>, Vec<Resource>>`
   = note: required by `futures::Future::poll`

error[E0277]: the trait bound `Vec<Resource>: Extend<impl futures::Future>` is not satisfied
  --> examples/19-pages-resources-map-insteadof-then-expanded-fail.rs:25:10
   |
25 |         .collect()
   |          ^^^^^^^ the trait `Extend<impl futures::Future>` is not implemented for `Vec<Resource>`
   |
   = help: the following implementations were found:
             <Vec<T, A> as Extend<&'a T>>
             <Vec<T, A> as Extend<T>>

As a second example, let’s take the opposite case, where a map is expected but we use a then instead.

async fn collect_resources_n_pages_buffered(n: usize, buf_factor: usize) -> Vec<Resource> {
    // Used `then` instead of `map`.
    get_ids_n_pages_buffered(n, buf_factor)
        .then(|id| fetch_resource(id))
        .buffered(buf_factor)
        .collect()
        .await
}

This time, the error message is already more complex. Fortunately the real compiler output is colorized in the console, which makes it a bit more readable than here.

error[E0277]: `Resource` is not a future
  --> examples/20-pages-resources-then-insteadof-map-fail.rs:22:10
   |
22 |         .buffered(buf_factor)
   |          ^^^^^^^^ `Resource` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `Resource`

error[E0599]: no method named `collect` found for struct `Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>` in the current scope
  --> examples/20-pages-resources-then-insteadof-map-fail.rs:23:10
   |
23 |           .collect()
   |            ^^^^^^^ method not found in `Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>`
   | 
  ::: /home/dev-1000/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.13/src/stream/stream/buffered.rs:12:1
   |
12 | / pin_project! {
13 | |     /// Stream for the [`buffered`](super::StreamExt::buffered) method.
14 | |     #[must_use = "streams do nothing unless polled"]
15 | |     pub struct Buffered<St>
...  |
24 | |     }
25 | | }
   | | -
   | | |
   | | doesn't satisfy `_: Iterator`
   | |_doesn't satisfy `_: StreamExt`
   |   doesn't satisfy `_: Stream`
   |
   = note: the method `collect` exists but the following trait bounds were not satisfied:
           `Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: Stream`
           which is required by `Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: StreamExt`
           `&Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: Stream`
           which is required by `&Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: StreamExt`
           `&mut Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: Stream`
           which is required by `&mut Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: StreamExt`
           `Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: Iterator`
           which is required by `&mut Buffered<futures::stream::Then<impl Stream, impl futures::Future, [closure@examples/20-pages-resources-then-insteadof-map-fail.rs:21:15: 21:38]>>: Iterator`

Now, let’s make the same mistake on the expanded function.

async fn collect_resources_n_pages_buffered(n: usize, buf_factor: usize) -> Vec<Resource> {
    stream::iter(0..)
        .map(|i| get_page(i))
        .take(n)
        .buffered(buf_factor)
        .flat_map(|page| stream::iter(page))
        // Used `then` instead of `map` here.
        .then(|id| fetch_resource(id))
        .buffered(buf_factor)
        .collect()
        .await
}

Once again, the error is somewhat similar but the types are much more complex and take a lot of space in the console output.

error[E0277]: `Resource` is not a future
  --> examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:26:10
   |
26 |         .buffered(buf_factor)
   |          ^^^^^^^^ `Resource` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `Resource`

error[E0599]: no method named `collect` found for struct `Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>` in the current scope
  --> examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:27:10
   |
27 |           .collect()
   |            ^^^^^^^ method not found in `Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>`
   | 
  ::: /home/dev-1000/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.13/src/stream/stream/buffered.rs:12:1
   |
12 | / pin_project! {
13 | |     /// Stream for the [`buffered`](super::StreamExt::buffered) method.
14 | |     #[must_use = "streams do nothing unless polled"]
15 | |     pub struct Buffered<St>
...  |
24 | |     }
25 | | }
   | | -
   | | |
   | | doesn't satisfy `_: Iterator`
   | |_doesn't satisfy `_: StreamExt`
   |   doesn't satisfy `_: Stream`
   |
   = note: the method `collect` exists but the following trait bounds were not satisfied:
           `Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: Stream`
           which is required by `Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: StreamExt`
           `&Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: Stream`
           which is required by `&Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: StreamExt`
           `&mut Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: Stream`
           which is required by `&mut Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: StreamExt`
           `Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: Iterator`
           which is required by `&mut Buffered<futures::stream::Then<futures::stream::FlatMap<Buffered<futures::stream::Take<futures::stream::Map<futures::stream::Iter<RangeFrom<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:21:14: 21:29]>>>, futures::stream::Iter<std::vec::IntoIter<usize>>, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:24:19: 24:44]>, impl futures::Future, [closure@examples/21-pages-resources-then-insteadof-map-expanded-fail.rs:25:15: 25:38]>>: Iterator`

When error messages become complex, break the chain of combinators into intermediate functions, and use the impl Trait syntax.

To convert a stream into another stream via an async function, either use .then(f) or .map(f).buffered(n).

What we’ve learned so far

You’ve now reached the end of this first case study! In the next blog post, we’ll study how to implement cancellation of queries within a stream. But before you go on to read this second part, let’s recap some tips that we’ve learned so far about asynchronous programming in Rust.

The first thing is the syntax: when do you need to add async {} blocks and .await statements? In summary, if some expression foo has type T then async { foo } has type Future<Output = T>. Conversely, if bar has type Future<Output = T> then bar.await has type T. As we’ve seen, this means that |i| async move { get_page(i).await } is simply equivalent to |i| get_page(i).

My second tip is to define intermediate functions every time your asynchronous expressions become complicated. Indeed, the compiler will help you validate that each small function has correct types. You will also likely want to leverage the impl Trait syntax!

Now, some advice more specific to the Stream API.

  • If you need to call .take(), do it before any buffering: this will make sure that no additional requests are buffered that would then be discarded.
  • If you can afford obtaining results in arbitrary order, use .buffer_unordered() rather than .buffered(), as it will decrease the overall latency. The results will of course depend on your buffering parameter and latency characteristics, but in our example unordered buffering reduced latency by a factor 2.
  • In terms of types, .then() is equivalent to .map().buffered().

Without further ado, I encourage you to check my next blog post to learn how to cancel requests within a stream.


Comments

To react to this blog post please check the Twitter thread and the Reddit thread.


RSS | Mastodon | GitHub | Reddit | Twitter


You may also like

Asynchronous streams in Rust (part 2) - Cancelling expired requests
STV-rs: Single Transferable Vote implementation in Rust
Making my website 10x smaller in 2024, with a dark mode
Reaching the (current) limits of Rust's type system with asynchronous programming
And 29 more posts on this blog!