In the previous blog post, we’ve learned how to use asynchronous streams in Rust. We’ve seen how to overcome the sometimes puzzling compilation errors that arise when we use the async keyword incorrectly, then studied how to buffer asynchronous queries in a stream to reduce overall latency, and in particular how unordered buffering can improve latency even further.

In this blog post, we’ll study the case of cancellation. Typically, a UI thread in the foreground requests some resources depending on user interaction, and a worker thread in the background processes these requests by making asynchronous queries to the network, a database, etc. However, the UI thread may want to cancel these requests if the user already moved on - for example the user scrolled very fast.

So how can we combine streams, which provide a convenient and idiomatic API to process items asynchronously, together with cancellation? The goals are both to avoid wasting resources with useless queries and to offer the best latency to the user.

As for the previous post, the full code examples are available on this GitHub repository. At the time of writing, I’ve tested them with Rust version 1.50.0.

$ cargo --version
cargo 1.50.0 (f04e7fab7 2021-02-04)

Case study: cancelling background queries

In this case study, we will consider a setup with two threads. A “foreground” UI thread requests some resources due to user interaction, and a worker thread will process these requests in the background. This time, the “trick” is that the UI thread may cancel requests that are not relevant anymore - typically because the user already moved on to display something else. This is especially relevant if processing requests takes a non-trivial amount of time (e.g. due to fetching resources over the network), while the user interacts quickly with the UI (e.g. scrolling very fast).

So the goal will be to establish communication between the foreground and background threads in the form of a stream of queries, while supporting cancellation to avoid wasting time and resources on requests that are not relevant anymore.

Setting up a worker task

In this case study, we will depend on the same crates as for the first one, i.e. tokio, futures and rand, as well as lazy_static for convenience.

Before diving into the cancellation problem, let’s start with a basic setup with a task producing queries and another task processing them. For the sake of this example, we will use the tokio::spawn function to create the two tasks.

To send the requests from one task to the other, we will use a multi-producer single-consumer queue from the futures crate, more precisely the futures::channel::mpsc::unbounded version. In our case, we have a single consumer – the background processing task – and we also have a single producer – the foreground UI task – but there was no single-producer queue in the futures crate. This queue implements the Stream trait on the receiving end, that we already familiarized with in the first case study.

In this basic setup, we will simply print the queries – without actually querying anything. We will use the for_each method for that.

use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::join;
use futures::stream::StreamExt;
use tokio::spawn;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Send 10 queries");
    send_receive(10).await?;
    Ok(())
}

async fn send_receive(n: usize) -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = unbounded();

    let send = spawn(async move {
        send_task(tx, n).await;
    });

    let receive = spawn(async move {
        receive_task(rx).await;
    });

    let (send_res, receive_res) = join!(send, receive);
    send_res?;
    receive_res?;
    Ok(())
}

async fn send_task(tx: UnboundedSender<usize>, n: usize) {
    for i in 0..n {
        tx.unbounded_send(i).unwrap();
    }
}

async fn receive_task(rx: UnboundedReceiver<usize>) {
    rx.for_each(|i| println!("# query({})", i)).await;
}

Unfortunately, in this first attempt we didn’t pass a future to the for_each function.

error[E0277]: `()` is not a future
  --> examples/22-ui-hello-1-fail.rs:37:8
   |
37 |     rx.for_each(|i| println!("# query({})", i)).await;
   |        ^^^^^^^^ `()` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `()`

error[E0277]: `()` is not a future
  --> examples/22-ui-hello-1-fail.rs:37:5
   |
37 |     rx.for_each(|i| println!("# query({})", i)).await;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `()`
   = note: required because of the requirements on the impl of `futures::Future` for `ForEach<UnboundedReceiver<usize>, (), [closure@examples/22-ui-hello-1-fail.rs:37:17: 37:47]>`
   = note: required by `futures::Future::poll`

If we look more closely at the for_each function, it essentially has the following signature.

fn for_each<Fut, F>(self, f: F) -> impl Future<Output = ()>
where
    F: FnMut(Self::Item) -> Fut,
    Fut: Future<Output = ()>, 

So, similarly to the then function that we encountered in the first case study, we need to wrap the printing statement inside of an async block. We also need to mark this block as move, to take ownership of the i variable.

// Second attempt.
async fn receive_task(rx: UnboundedReceiver<usize>) {
    rx.for_each(|i| async move { println!("# query({})", i) })
        .await;
}

We then obtain the following output, as expected.

Send 10 queries
# query(0)
# query(1)
# query(2)
# query(3)
# query(4)
# query(5)
# query(6)
# query(7)
# query(8)
# query(9)

Resolving and buffering queries

The next step is to actually make the (asynchronous) queries in the background task. Like in the first case, we’ll simulate that with a get_data function that sleeps for a random amount of time, and returns the input as data, wrapped into a dummy Data structure for this example.

use rand::distributions::{Distribution, Uniform};
use std::time::Duration;
use tokio::time::sleep;

#[derive(Clone, Copy)]
struct Data(usize);

impl std::fmt::Debug for Data {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_fmt(format_args!("d:{}", self.0))
    }
}

async fn get_data(i: usize) -> Data {
    let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
    println!("## get_data({}) will complete in {} ms", i, millis);

    sleep(Duration::from_millis(millis)).await;
    println!("## get_data({}) completed", i);
    Data(i)
}

We will then call this function on each request.

async fn receive_task_queries(rx: UnboundedReceiver<usize>) {
    rx.for_each(|i| async move {
        let data = get_data(i).await;
        println!("## data = {:?}", data);
    })
    .await;
}

As expected, for now everything is sequential.

Resolve 10 queries
[0] ## get_data(0) will complete in 0 ms
[1] ## get_data(0) completed
## data = d:0
[1] ## get_data(1) will complete in 7 ms
[9] ## get_data(1) completed
## data = d:1
[9] ## get_data(2) will complete in 6 ms
[16] ## get_data(2) completed
## data = d:2
[16] ## get_data(3) will complete in 9 ms
[26] ## get_data(3) completed
## data = d:3
[27] ## get_data(4) will complete in 0 ms
[28] ## get_data(4) completed
## data = d:4
[28] ## get_data(5) will complete in 5 ms
[34] ## get_data(5) completed
## data = d:5
[34] ## get_data(6) will complete in 8 ms
[43] ## get_data(6) completed
## data = d:6
[43] ## get_data(7) will complete in 4 ms
[49] ## get_data(7) completed
## data = d:7
[49] ## get_data(8) will complete in 6 ms
[56] ## get_data(8) completed
## data = d:8
[56] ## get_data(9) will complete in 6 ms
[63] ## get_data(9) completed
## data = d:9

SVG

Like in the first case study, we can apply buffering to process multiple requests in parallel. We can first try to apply the .buffered() function directly on the receiving end of the channel.

// Buffering, first attempt.
async fn receive_task_queries_buffered(rx: UnboundedReceiver<usize>, buf_factor: usize) {
    rx.buffered(buf_factor)
        .for_each(|i| async move {
            let data = get_data(i).await;
            println!("## data = {:?}", data);
        })
        .await;
}

This unfortunately doesn’t work, and as you may imagine the error message is not the easiest to parse.

error[E0277]: `usize` is not a future
  --> examples/25-ui-buffered-1-fail.rs:48:17
   |
48 |     rx.buffered(buf_factor)
   |                 ^^^^^^^^^^ `usize` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `usize`

error[E0599]: no method named `for_each` found for struct `Buffered<UnboundedReceiver<usize>>` in the current scope
  --> examples/25-ui-buffered-1-fail.rs:49:10
   |
49 |           .for_each(|i| async move {
   |            ^^^^^^^^ method not found in `Buffered<UnboundedReceiver<usize>>`
   | 
  ::: /home/dev-1000/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.13/src/stream/stream/buffered.rs:12:1
   |
12 | / pin_project! {
13 | |     /// Stream for the [`buffered`](super::StreamExt::buffered) method.
14 | |     #[must_use = "streams do nothing unless polled"]
15 | |     pub struct Buffered<St>
...  |
24 | |     }
25 | | }
   | | -
   | | |
   | | doesn't satisfy `Buffered<UnboundedReceiver<usize>>: Iterator`
   | |_doesn't satisfy `Buffered<UnboundedReceiver<usize>>: StreamExt`
   |   doesn't satisfy `Buffered<UnboundedReceiver<usize>>: Stream`
   |
   = note: the method `for_each` exists but the following trait bounds were not satisfied:
           `Buffered<UnboundedReceiver<usize>>: Stream`
           which is required by `Buffered<UnboundedReceiver<usize>>: StreamExt`
           `&Buffered<UnboundedReceiver<usize>>: Stream`
           which is required by `&Buffered<UnboundedReceiver<usize>>: StreamExt`
           `&mut Buffered<UnboundedReceiver<usize>>: Stream`
           which is required by `&mut Buffered<UnboundedReceiver<usize>>: StreamExt`
           `Buffered<UnboundedReceiver<usize>>: Iterator`
           which is required by `&mut Buffered<UnboundedReceiver<usize>>: Iterator`

If we recall again our first case study, the buffered combinator takes as input a stream of futures, but our receiving channel however implements a stream of integers. What we can do is to call map with the get_data function before buffering – and leave only the printing statement in the for_each step.

// Buffering, second attempt.
async fn receive_task_queries_buffered(rx: UnboundedReceiver<usize>, buf_factor: usize) {
    rx.map(|i| get_data(i))
        .buffered(buf_factor)
        .for_each(|data| async move {
            println!("## data = {:?}", data);
        })
        .await;
}
Resolve 10 queries, buffered by 5
[0] ## get_data(0) will complete in 9 ms
[0] ## get_data(1) will complete in 3 ms
[0] ## get_data(2) will complete in 0 ms
[0] ## get_data(3) will complete in 7 ms
[0] ## get_data(4) will complete in 7 ms
[1] ## get_data(2) completed
[4] ## get_data(1) completed
[8] ## get_data(3) completed
[8] ## get_data(4) completed
[10] ## get_data(0) completed
## data = d:0
## data = d:1
## data = d:2
## data = d:3
## data = d:4
[10] ## get_data(5) will complete in 3 ms
[10] ## get_data(6) will complete in 7 ms
[10] ## get_data(7) will complete in 0 ms
[10] ## get_data(8) will complete in 8 ms
[10] ## get_data(9) will complete in 1 ms
[11] ## get_data(7) completed
[12] ## get_data(9) completed
[14] ## get_data(5) completed
## data = d:5
[18] ## get_data(6) completed
## data = d:6
## data = d:7
[18] ## get_data(8) completed
## data = d:8
## data = d:9

SVG

We could similarly use buffer_unordered, but there is no more pitfall about it, so let’s move on to cancelling the queries.

Filtering cancelled queries

In the next step, we’ll simulate cancellation of tasks from the sending task, as follows. This task will send requests by batches of 5, and sleep for a random time between batches. We consider that at any given time, only requests of the current batch are valid, the other requests are expired and should therefore be cancelled, i.e. the receiving worker task should skip these previous requests.

To mark which requests are currently valid, we’ll define the following ValidRange struct, which wraps a Range object into suitable synchronization primitives, namely:

use std::ops::Range;
use std::sync::{Arc, RwLock};

#[derive(Clone)]
struct ValidRange {
    range: Arc<RwLock<Range<usize>>>,
}

impl ValidRange {
    fn new() -> (ValidRange, ValidRange) {
        let writer = Arc::new(RwLock::new(0..0));
        let reader = writer.clone();
        (ValidRange { range: writer }, ValidRange { range: reader })
    }

    fn set(&self, range: Range<usize>) {
        *self.range.write().unwrap() = range;
    }

    fn is_valid(&self, x: usize) -> bool {
        self.range.read().unwrap().contains(&x)
    }
}

We’ll also define some ValidCounter object, to check how many of the queries made by the receiving task where still valid, or already expired. We will use the AtomicUsize primitive to safely update the counters across threads/tasks.

use std::sync::atomic::{AtomicUsize, Ordering};

struct ValidCounter {
    valid: AtomicUsize,
    expired: AtomicUsize,
}

impl ValidCounter {
    fn new() -> ValidCounter {
        ValidCounter {
            valid: AtomicUsize::new(0),
            expired: AtomicUsize::new(0),
        }
    }

    fn increment(&self, is_valid: bool) {
        if is_valid {
            self.valid.fetch_add(1, Ordering::SeqCst);
        } else {
            self.expired.fetch_add(1, Ordering::SeqCst);
        }
    }

    fn print(&self) {
        let valid = self.valid.load(Ordering::SeqCst);
        let expired = self.expired.load(Ordering::SeqCst);

        println!(
            "Made {} queries, {} results were still valid, {} expired",
            valid + expired,
            valid,
            expired
        );
    }
}

In our first scenario, we’ll simply mark the validity of requests on the sending side, without actually cancelling anything on the receiving side.

#[tokio::main]
async fn main() {
    println!("Don't cancel 25 queries, buffered by 3");
    congested_queries_buffered(5, 3).await?;
}

async fn congested_queries_buffered(
    n: usize,
    buf_factor: usize,
) -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = unbounded();
    let (valid_writer, valid_reader) = ValidRange::new();
    let counter = Arc::new(ValidCounter::new());

    let send = spawn(async move {
        send_task_tracking_validity(tx, n, valid_writer).await;
    });

    let counter_writer = counter.clone();
    let receive = spawn(async move {
        receive_task_observing(rx, buf_factor, &valid_reader, &counter_writer).await;
    });

    let (send_res, receive_res) = join!(send, receive);
    send_res?;
    receive_res?;

    counter.print();
    Ok(())
}

async fn send_task_tracking_validity(
    tx: UnboundedSender<usize>,
    n: usize,
    valid_writer: ValidRange,
) {
    for i in 0..n {
        let range = 10 * i..10 * i + 5;
        valid_writer.set(range.clone());
        for j in range {
            println!("## unbounded_send({})", j);
            tx.unbounded_send(j).unwrap();
        }
        let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
        println!("## sleep({}) for {} ms", i, millis);

        let duration = Duration::from_millis(millis);
        sleep(duration).await;
        println!("## sleep({}) completed", i);
    }
}

async fn receive_task_observing(
    rx: UnboundedReceiver<usize>,
    buf_factor: usize,
    valid_reader: &ValidRange,
    counter_writer: &Arc<ValidCounter>,
) {
    rx.map(|i| get_data(i))
        .buffered(buf_factor)
        .for_each(|data| async move {
            let is_valid = valid_reader.is_valid(data.0);
            counter_writer.increment(is_valid);
            println!(
                "## data = {:?} ({})",
                data,
                if is_valid { "valid" } else { "expired" }
            );
        })
        .await;
}

As you can see in the following output, because we never cancel any queries but fetching data takes a lot of time, we end up accumulating expired queries. In this example, out of 25 queries, only the first one and the last 5 are still valid when they arrive, and these last queries arrive with a high latency due to all the expired queries that we make in the meantime!

You can also see that even though get_data(2) finishes instantly, it is blocked by get_data(1) due to the ordered buffering, and therefore get_data(2) is expired when it completes out of the stream.

SVG

Don't cancel 25 queries, buffered by 3
## unbounded_send(0)
## unbounded_send(1)
## unbounded_send(2)
## unbounded_send(3)
## unbounded_send(4)
## sleep(0) for 8 ms
[0] ## get_data(0) will complete in 5 ms
[0] ## get_data(1) will complete in 9 ms
[0] ## get_data(2) will complete in 0 ms
[1] ## get_data(2) completed
[6] ## get_data(0) completed
## data = d:0 (valid)
[6] ## get_data(3) will complete in 0 ms
[7] ## get_data(3) completed
## sleep(0) completed
## unbounded_send(10)
## unbounded_send(11)
## unbounded_send(12)
## unbounded_send(13)
## unbounded_send(14)
## sleep(1) for 7 ms
[9] ## get_data(1) completed
## data = d:1 (expired)
## data = d:2 (expired)
## data = d:3 (expired)
[9] ## get_data(4) will complete in 4 ms
[9] ## get_data(10) will complete in 8 ms
[9] ## get_data(11) will complete in 5 ms
[14] ## get_data(4) completed
## data = d:4 (expired)
[15] ## get_data(12) will complete in 0 ms
[16] ## get_data(11) completed
[16] ## get_data(12) completed
## sleep(1) completed
## unbounded_send(20)
## unbounded_send(21)
## unbounded_send(22)
## unbounded_send(23)
## unbounded_send(24)
## sleep(2) for 5 ms
[19] ## get_data(10) completed
## data = d:10 (expired)
## data = d:11 (expired)
## data = d:12 (expired)
[19] ## get_data(13) will complete in 9 ms
[19] ## get_data(14) will complete in 7 ms
[19] ## get_data(20) will complete in 5 ms
## sleep(2) completed
## unbounded_send(30)
## unbounded_send(31)
## unbounded_send(32)
## unbounded_send(33)
## unbounded_send(34)
## sleep(3) for 0 ms
## sleep(3) completed
## unbounded_send(40)
## unbounded_send(41)
## unbounded_send(42)
## unbounded_send(43)
## unbounded_send(44)
## sleep(4) for 5 ms
[26] ## get_data(20) completed
[27] ## get_data(14) completed
[30] ## get_data(13) completed
## data = d:13 (expired)
## data = d:14 (expired)
## data = d:20 (expired)
[30] ## get_data(21) will complete in 2 ms
[30] ## get_data(22) will complete in 5 ms
[30] ## get_data(23) will complete in 6 ms
## sleep(4) completed
[32] ## get_data(21) completed
## data = d:21 (expired)
[32] ## get_data(24) will complete in 7 ms
[35] ## get_data(22) completed
## data = d:22 (expired)
[35] ## get_data(30) will complete in 1 ms
[37] ## get_data(23) completed
## data = d:23 (expired)
[37] ## get_data(31) will complete in 1 ms
[38] ## get_data(30) completed
[39] ## get_data(31) completed
[40] ## get_data(24) completed
## data = d:24 (expired)
## data = d:30 (expired)
## data = d:31 (expired)
[41] ## get_data(32) will complete in 5 ms
[41] ## get_data(33) will complete in 6 ms
[41] ## get_data(34) will complete in 7 ms
[47] ## get_data(32) completed
## data = d:32 (expired)
[47] ## get_data(40) will complete in 6 ms
[48] ## get_data(33) completed
## data = d:33 (expired)
[48] ## get_data(41) will complete in 7 ms
[48] ## get_data(34) completed
## data = d:34 (expired)
[48] ## get_data(42) will complete in 9 ms
[53] ## get_data(40) completed
## data = d:40 (valid)
[53] ## get_data(43) will complete in 4 ms
[57] ## get_data(41) completed
## data = d:41 (valid)
[57] ## get_data(44) will complete in 9 ms
[59] ## get_data(42) completed
## data = d:42 (valid)
[59] ## get_data(43) completed
## data = d:43 (valid)
[67] ## get_data(44) completed
## data = d:44 (valid)
Made 25 queries, 6 results were still valid, 19 expired

If you look more closely at the code, you may wonder why the receiving task takes some parameters by reference, even though they are not used elsewhere and could be moved into the function. The reason is that somehow the for_each combinator requires that the captured values are copyable.

If instead we write the following code where valid_reader is passed by value:

async fn receive_task_observing(
    rx: UnboundedReceiver<usize>,
    buf_factor: usize,
    valid_reader: ValidRange,
    counter_writer: &Arc<ValidCounter>,
) {
    rx.map(|i| get_data(i))
        .buffered(buf_factor)
        .for_each(|data| async move {
            let is_valid = valid_reader.is_valid(data.0);
            counter_writer.increment(is_valid);
            println!(
                "## data = {:?} ({})",
                data,
                if is_valid { "valid" } else { "expired" }
            );
        })
        .await;
}

then we obtain the following error.

error[E0507]: cannot move out of `valid_reader`, a captured variable in an `FnMut` closure
  --> examples/28-ui-no-cancel-2-fail.rs:78:37
   |
73 |       valid_reader: ValidRange,
   |       ------------ captured outer variable
...
78 |           .for_each(|data| async move {
   |  _____________________________________^
79 | |             let is_valid = valid_reader.is_valid(data.0);
   | |                            ------------
   | |                            |
   | |                            move occurs because `valid_reader` has type `ValidRange`, which does not implement the `Copy` trait
   | |                            move occurs due to use in generator
80 | |             counter_writer.increment(is_valid);
81 | |             println!(
...  |
85 | |             );
86 | |         })
   | |_________^ move out of `valid_reader` occurs here

Cancellation

Let’s now implement cancellation. To do that, we can use the filter combinator on the stream to skip queries that are expired – and we can reuse the valid_reader object to check the validity of each query.

Note that we can put filter in front of our chain of combinators, but the filtering check will only be evaluated when a query is about to be made. This is the “magic” with async programming: because futures are not evaluated right away, but are lazily driven by the .await part at the end.

async fn receive_task_buffered_cancelling(
    rx: UnboundedReceiver<usize>,
    buf_factor: usize,
    valid_reader: &ValidRange,
    counter_writer: &Arc<ValidCounter>,
) {
    // First attempt at filtering.
    rx.filter(|i| async move {
        let is_valid = valid_reader.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        is_valid
    })
    .map(|i| get_data(i))
    .buffered(buf_factor)
    .for_each(|data| async move {
        let is_valid = valid_reader.is_valid(data.0);
        counter_writer.increment(is_valid);
        println!(
            "## data = {:?} ({})",
            data,
            if is_valid { "valid" } else { "expired" }
        );
    })
    .await;
}

However, if we simply try it like this, we obtain another strange compilation error.

error: lifetime may not live long enough
  --> examples/29-ui-cancel-buffered-1-fail.rs:76:19
   |
76 |       rx.filter(|i| async move {
   |  ________________--_^
   | |                ||
   | |                |return type of closure `impl futures::Future` contains a lifetime `'2`
   | |                has type `&'1 usize`
77 | |         let is_valid = valid_reader.is_valid(*i);
78 | |         println!("## filter({}) = {}", i, is_valid);
79 | |         is_valid
80 | |     })
   | |_____^ returning this value requires that `'1` must outlive `'2`

Somehow, the compiler complains that the index i has a lifetime too short compared to some unspecified lifetime '2. It’s not entirely clear to me why this happens, but I managed to get around it by using the future::ready utility function, which was mentioned in filter’s documentation.

    // Second attempt at filtering.
    rx.filter(|i| {
        let is_valid = valid_reader.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        future::ready(is_valid)
    })

Following is an example output with cancellation. The number of “useful” queries (i.e. for which the results were still up-to-date when arriving) didn’t significantly increase (7 instead of 6), but the more important part is that we made much fewer “wasted” queries (9 instead of 19). And because we didn’t have to wait for a backlog of wasted queries, the valid results arrived with a much better latency.

SVG

Cancel 25 queries, buffered by 3
## unbounded_send(0)
## unbounded_send(1)
## unbounded_send(2)
## unbounded_send(3)
## unbounded_send(4)
## sleep(0) for 7 ms
## filter(0) = true
## filter(1) = true
## filter(2) = true
[0] ## get_data(0) will complete in 9 ms
[0] ## get_data(1) will complete in 3 ms
[0] ## get_data(2) will complete in 1 ms
[2] ## get_data(2) completed
[4] ## get_data(1) completed
## sleep(0) completed
## unbounded_send(10)
## unbounded_send(11)
## unbounded_send(12)
## unbounded_send(13)
## unbounded_send(14)
## sleep(1) for 8 ms
[10] ## get_data(0) completed
## data = d:0 (expired)
## filter(3) = false
## filter(4) = false
## filter(10) = true
## data = d:1 (expired)
## filter(11) = true
## data = d:2 (expired)
## filter(12) = true
[11] ## get_data(10) will complete in 5 ms
[11] ## get_data(11) will complete in 9 ms
[11] ## get_data(12) will complete in 4 ms
[16] ## get_data(12) completed
[17] ## get_data(10) completed
## data = d:10 (valid)
## filter(13) = true
## sleep(1) completed
## unbounded_send(20)
## unbounded_send(21)
[17] ## get_data(13) will complete in 2 ms
## unbounded_send(22)
## unbounded_send(23)
## unbounded_send(24)
## sleep(2) for 3 ms
[21] ## get_data(11) completed
## data = d:11 (expired)
## filter(14) = false
## filter(20) = true
## data = d:12 (expired)
## filter(21) = true
[21] ## get_data(13) completed
## data = d:13 (expired)
## filter(22) = true
[21] ## get_data(20) will complete in 0 ms
[21] ## get_data(21) will complete in 8 ms
[21] ## get_data(22) will complete in 0 ms
[22] ## get_data(20) completed
## sleep(2) completed
## unbounded_send(30)
## data = d:20 (valid)
## unbounded_send(31)
## unbounded_send(32)
## unbounded_send(33)
## unbounded_send(34)
## sleep(3) for 4 ms
## filter(23) = false
## filter(24) = false
## filter(30) = true
[22] ## get_data(22) completed
[22] ## get_data(30) will complete in 6 ms
## sleep(3) completed
## unbounded_send(40)
## unbounded_send(41)
## unbounded_send(42)
## unbounded_send(43)
## unbounded_send(44)
## sleep(4) for 3 ms
[30] ## get_data(21) completed
## data = d:21 (expired)
## filter(31) = false
## filter(32) = false
## filter(33) = false
## filter(34) = false
## filter(40) = true
## data = d:22 (expired)
## filter(41) = true
[30] ## get_data(30) completed
## data = d:30 (expired)
## filter(42) = true
[30] ## get_data(40) will complete in 6 ms
[30] ## get_data(41) will complete in 1 ms
[30] ## get_data(42) will complete in 5 ms
[32] ## get_data(41) completed
## sleep(4) completed
[36] ## get_data(42) completed
[37] ## get_data(40) completed
## data = d:40 (valid)
## filter(43) = true
## data = d:41 (valid)
## filter(44) = true
## data = d:42 (valid)
[37] ## get_data(43) will complete in 0 ms
[37] ## get_data(44) will complete in 9 ms
[39] ## get_data(43) completed
## data = d:43 (valid)
[47] ## get_data(44) completed
## data = d:44 (valid)
Made 16 queries, 7 results were still valid, 9 expired

Edit: as noted on reddit, it’s possible to use an async move block in place of the future::ready call. This way, the future is only capturing the is_valid value, rather than the i variable – which is a reference whose lifetime is too short.

    // Alternative with an inner `async` block.
    rx.filter(|i| {
        let is_valid = valid_reader.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        async move { is_valid }
    })

Unordered buffering

We can also use unordered buffering, as shown in the following example output. Here we have 8 useful queries and 7 wasted queries, but most importantly the latency improved as well compared to ordered buffering.

SVG

Cancel 25 queries, buffer-unordered by 3
## unbounded_send(0)
## unbounded_send(1)
## unbounded_send(2)
## unbounded_send(3)
## unbounded_send(4)
## sleep(0) for 5 ms
## filter(0) = true
## filter(1) = true
## filter(2) = true
[0] ## get_data(0) will complete in 5 ms
[0] ## get_data(1) will complete in 1 ms
[0] ## get_data(2) will complete in 8 ms
[2] ## get_data(1) completed
## data = d:1 (valid)
## filter(3) = true
[2] ## get_data(3) will complete in 2 ms
[5] ## get_data(3) completed
## data = d:3 (valid)
## filter(4) = true
[5] ## get_data(4) will complete in 0 ms
[6] ## get_data(0) completed
## sleep(0) completed
## unbounded_send(10)
## unbounded_send(11)
## data = d:0 (valid)
## filter(10) = true
## unbounded_send(12)
## unbounded_send(13)
## unbounded_send(14)
## sleep(1) for 2 ms
[6] ## get_data(4) completed
## data = d:4 (expired)
## filter(11) = true
[6] ## get_data(10) will complete in 7 ms
[6] ## get_data(11) will complete in 4 ms
## sleep(1) completed
## unbounded_send(20)
## unbounded_send(21)
## unbounded_send(22)
## unbounded_send(23)
## unbounded_send(24)
## sleep(2) for 3 ms
[9] ## get_data(2) completed
## data = d:2 (expired)
## filter(12) = false
## filter(13) = false
## filter(14) = false
## filter(20) = true
[9] ## get_data(20) will complete in 6 ms
[10] ## get_data(11) completed
## data = d:11 (expired)
## filter(21) = true
[11] ## get_data(21) will complete in 5 ms
## sleep(2) completed
## unbounded_send(30)
## unbounded_send(31)
## unbounded_send(32)
## unbounded_send(33)
## unbounded_send(34)
## sleep(3) for 3 ms
[14] ## get_data(10) completed
## data = d:10 (expired)
## filter(22) = false
## filter(23) = false
## filter(24) = false
## filter(30) = true
[14] ## get_data(30) will complete in 5 ms
## sleep(3) completed
## unbounded_send(40)
## unbounded_send(41)
## unbounded_send(42)
## unbounded_send(43)
## unbounded_send(44)
[17] ## get_data(20) completed
## data = d:20 (expired)
## filter(31) = false
## filter(32) = false
## filter(33) = false
## filter(34) = false
## filter(40) = true
## sleep(4) for 9 ms
[17] ## get_data(21) completed
## data = d:21 (expired)
## filter(41) = true
[17] ## get_data(40) will complete in 6 ms
[17] ## get_data(41) will complete in 6 ms
[20] ## get_data(30) completed
## data = d:30 (expired)
## filter(42) = true
[20] ## get_data(42) will complete in 0 ms
[21] ## get_data(42) completed
## data = d:42 (valid)
## filter(43) = true
[21] ## get_data(43) will complete in 9 ms
[24] ## get_data(40) completed
## data = d:40 (valid)
## filter(44) = true
[24] ## get_data(41) completed
## data = d:41 (valid)
[24] ## get_data(44) will complete in 0 ms
[25] ## get_data(44) completed
## data = d:44 (valid)
## sleep(4) completed
[32] ## get_data(43) completed
## data = d:43 (valid)
Made 15 queries, 8 results were still valid, 7 expired

Bonus: “One type is more general than the other”

As a bonus, I’d like to discuss an even weirder error message that I encountered while writing this blog post. Originally, I hadn’t separated the sending and receiving tasks into their own functions, but I simply spawned the tasks directly in the main function. This led to some mysterious message…

The case I want to mention here is my first attempt at filtering, where I used an async move function, rather than a regular lambda returning future::ready. As we discussed in the previous section, this led to an error related to lifetimes. However, when the code is directly inline as follows, the error message becomes even more confusing.

async fn cancel_queries_buffered(
    n: usize,
    buf_factor: usize,
) -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = unbounded();
    let (valid_writer, valid_reader) = ValidRange::new();
    let counter = Arc::new(ValidCounter::new());

    let send = spawn(async move {
        send_task_tracking_validity(tx, n, valid_writer).await;
    });

    // Receive task directly inlined in the `spawn` invocation.
    let counter_writer = counter.clone();
    let receive = spawn(async move {
        let valid_reader = &valid_reader;
        let counter_writer = &counter_writer;

        // Invalid attempt at filtering.
        rx.filter(|i| async move {
            let is_valid = valid_reader.is_valid(*i);
            println!("## filter({}) = {}", i, is_valid);
            is_valid
        })
        .map(|i| get_data(i))
        .buffered(buf_factor)
        .for_each(|data| async move {
            let is_valid = valid_reader.is_valid(data.0);
            counter_writer.increment(is_valid);
            println!(
                "## data = {:?} ({})",
                data,
                if is_valid { "valid" } else { "expired" }
            );
        })
        .await;
    });

    let (send_res, receive_res) = join!(send, receive);
    send_res?;
    receive_res?;

    counter.print();
    Ok(())
}

Indeed, the error message states that “one type is more general than the other”, without giving any detail. It expected an impl futures::Future type but found an impl futures::Future type!

error[E0308]: mismatched types
  --> examples/32-ui-cancel-buffered-3-fail.rs:37:19
   |
37 |       let receive = spawn(async move {
   |                     ^^^^^ one type is more general than the other
...
40 |           rx.filter(|i| async move {
   |  __________________________________-
41 | |             let is_valid = valid_reader.is_valid(*i);
42 | |             println!("## filter({}) = {}", i, is_valid);
43 | |             is_valid
44 | |         })
   | |         -
   | |         |
   | |_________the expected generator
   |           the found generator
   |
   = note: expected opaque type `impl futures::Future`
              found opaque type `impl futures::Future`

As you can see, the lifetimes that appeared in the error message of the previous section are now completely hidden inside an “opaque” Future type.

Breaking up your code into smaller functions can yield better error messages. On the contrary, nesting async blocks can lead to very obscure errors!

Writing our own stream combinator

If we take a step back, cancelling in the receiving task is done via the first .filter() call, but the rest of the task is independent from the stream that we pass to it. So you may wonder: could we extract out this cancelling logic, and make the receiving task work on any stream? In other words, could we define some cancel combinator, in the spirit of all the useful combinators available in the StreamExt trait?

You’ll notice that the combinator functions in StreamExt each return some struct that somehow wrap the combinator’s logic together with the original stream. We could try to manually do that, by defining a new struct and implementing the required Stream methods for it.

However, you’ll also notice that all our cancel combinator really does is a .filter() call, which means that we shouldn’t need to define any new struct. Even better, thanks to the impl Trait syntax, we shouldn’t have to write down any complex type at all.

Ideally, we’d like to write something like the following:

    let receive = spawn(async move {
        receive_task_buffered(
            cancel(rx, &valid_reader),
            buf_factor,
            &valid_reader,
            &counter_writer,
        )
        .await;
    });

where receive_task_buffered simply processes a stream where cancelled requests have already been filtered out.

async fn receive_task_buffered(
    rx: impl Stream<Item = usize>,
    buf_factor: usize,
    valid_reader: &ValidRange,
    counter_writer: &Arc<ValidCounter>,
) {
    rx.map(|i| get_data(i))
        .buffered(buf_factor)
        .for_each(|data| async move {
            let is_valid = valid_reader.is_valid(data.0);
            counter_writer.increment(is_valid);
            println!(
                "## data = {:?} ({})",
                data,
                if is_valid { "valid" } else { "expired" }
            );
        })
        .await;
}

Our first attempt at writing our cancel combinator will simply wrap the filter call.

// First attempt.
fn cancel<S: Stream<Item = usize>>(
    stream: S,
    valid_range: &ValidRange,
) -> impl Stream<Item = usize> {
    stream.filter(|i| {
        let is_valid = valid_range.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        future::ready(is_valid)
    })
}

However, if we do it directly like that we obtain an error related to lifetimes.

error[E0759]: `valid_range` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> examples/33-ui-cancel-combinator-1-fail.rs:59:19
   |
57 |       valid_range: &ValidRange,
   |                    ----------- this data with an anonymous lifetime `'_`...
58 |   ) -> impl Stream<Item = usize> {
59 |       stream.filter(|i| {
   |  ___________________^
60 | |         let is_valid = valid_range.is_valid(*i);
61 | |         println!("## filter({}) = {}", i, is_valid);
62 | |         future::ready(is_valid)
63 | |     })
   | |_____^ ...is captured here...
   |
note: ...and is required to live as long as `'static` here
  --> examples/33-ui-cancel-combinator-1-fail.rs:58:6
   |
58 | ) -> impl Stream<Item = usize> {
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^
help: to declare that the `impl Trait` captures data from argument `valid_range`, you can add an explicit `'_` lifetime bound
   |
58 | ) -> impl Stream<Item = usize> + '_ {
   |                                ^^^^

Indeed, the result stream depends on the valid_range reference. We can follow the advice given by the compiler.

// Second attempt.
fn cancel<S: Stream<Item = usize>>(
    stream: S,
    valid_range: &ValidRange,
) -> impl Stream<Item = usize> + '_ {
    stream.filter(|i| {
        let is_valid = valid_range.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        future::ready(is_valid)
    })
}

However, this isn’t enough because we need to tie the lifetime of the output to the lifetime of the input stream as well.

error[E0311]: the parameter type `S` may not live long enough
  --> examples/34-ui-cancel-combinator-2-fail.rs:58:6
   |
55 | fn cancel<S: Stream<Item = usize>>(
   |           -- help: consider adding an explicit lifetime bound...: `S: 'a +`
...
58 | ) -> impl Stream<Item = usize> + '_ {
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: the parameter type `S` must be valid for the anonymous lifetime #1 defined on the function body at 55:1...
  --> examples/34-ui-cancel-combinator-2-fail.rs:55:1
   |
55 | / fn cancel<S: Stream<Item = usize>>(
56 | |     stream: S,
57 | |     valid_range: &ValidRange,
58 | | ) -> impl Stream<Item = usize> + '_ {
   | |___________________________________^
note: ...so that the type `futures::stream::Filter<S, futures::future::Ready<bool>, [closure@examples/34-ui-cancel-combinator-2-fail.rs:59:19: 63:6]>` will meet its required lifetime bounds
  --> examples/34-ui-cancel-combinator-2-fail.rs:58:6
   |
58 | ) -> impl Stream<Item = usize> + '_ {
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Following the compiler’s advice, we can replace the anonymous lifetime by some explicit lifetime that bounds everything.

// Third attempt.
fn cancel<'a, S: Stream<Item = usize> + 'a>(
    stream: S,
    valid_range: &'a ValidRange,
) -> impl Stream<Item = usize> + 'a {
    stream.filter(|i| {
        let is_valid = valid_range.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        future::ready(is_valid)
    })
}

We still have one last error, a classic of async programming in Rust: we forgot to move referenced variables into the closure.

error[E0373]: closure may outlive the current function, but it borrows `valid_range`, which is owned by the current function
  --> examples/35-ui-cancel-combinator-3-fail.rs:59:19
   |
59 |     stream.filter(|i| {
   |                   ^^^ may outlive borrowed value `valid_range`
60 |         let is_valid = valid_range.is_valid(*i);
   |                        ----------- `valid_range` is borrowed here
   |
note: closure is returned here
  --> examples/35-ui-cancel-combinator-3-fail.rs:59:5
   |
59 | /     stream.filter(|i| {
60 | |         let is_valid = valid_range.is_valid(*i);
61 | |         println!("## filter({}) = {}", i, is_valid);
62 | |         future::ready(is_valid)
63 | |     })
   | |______^
help: to force the closure to take ownership of `valid_range` (and any other referenced variables), use the `move` keyword
   |
59 |     stream.filter(move |i| {
   |                   ^^^^^^^^

Once we fix that, our cancel combinator is ready to go!

// Fourth attempt.
fn cancel<'a, S: Stream<Item = usize> + 'a>(
    stream: S,
    valid_range: &'a ValidRange,
) -> impl Stream<Item = usize> + 'a {
    stream.filter(move |i| {
        let is_valid = valid_range.is_valid(*i);
        println!("## filter({}) = {}", i, is_valid);
        future::ready(is_valid)
    })
}

I don’t include any example output, as the results are equivalent to those of the previous section – we simply refactored some code.

Conclusion

As we’ve seen in these two blog posts, async programming in Rust is a powerful tool, and in particular streams are a natural extension of iterators in the asynchronous world.

However, even though the Rust compiler generally provides high-quality error messages, these errors can become a bit more complex or mysterious when all the async toolbox is involved, due to the new syntax (async {} blocks and .await), combined with lambdas and generally more complex types when combining streams. In the end, a bit of trial-and-error against compilation errors is unavoidable, but I hope that you’ve learned how to overcome most of them by applying some principles, and with experience things get more natural as well.

Here is a recap of a few tips that we’ve learned in this blog post.

  • The future::ready function is sometimes a useful alternative to create a lambda that returns a future. If || async { foo } doesn’t compile, try || { future::ready(foo) }.
  • Another way to circumvent lifetime issues is to create a smaller async block inside of a lambda, in order to capture fewer variables in the resulting future. If |x| async move { foo; bar } doesn’t compile, try |x| { foo; async move { bar } }.
  • The for_each stream combinator takes as input a function returning a future, similarly to .then().

I’ll also reiterate my main advice from the previous post to break your async code into small functions, which both avoids:

  • long chains of stream combinators with very complex types, as we’ve discussed in the previous post,
  • nested async blocks with complex lifetime requirements, as we’ve seen here.

This post was edited to take into account feedback on reddit, in particular in the “Cancellation” section.


Comments

To react to this blog post please check the Twitter thread and the Reddit thread.


RSS | Mastodon | GitHub | Reddit | Twitter


You may also like

Asynchronous streams in Rust (part 1) - Futures, buffering and mysterious compilation error messages
Reaching the (current) limits of Rust's type system with asynchronous programming
Making my website 10x smaller in 2024, with a dark mode
STV-rs: Single Transferable Vote implementation in Rust
And 29 more posts on this blog!