kanejaku.org

Q: How to limit the number of inflight async tasks in Rust?

4 Nov 2020

Let’s say we want to have some concurrency for an async function in Rust. We want to execute the function thousands of times with different parameters (e.g. indexing many files). We also want to limit the number of inflight tasks so that the load on the system doesn’t exceed a certain level.

struct Task { /* ... */ }
struct TaskResult { /* ... */ }

async fn do_task(task: Task) -> Result<TaskResult> {
  // ...
}

const N: usize = 5;

async fn run_tasks(tasks: Vec<Task>) {
  // Q: How to limit the number of inflight tasks to N?
}

How would you do that?

I had no idea how to search the web for the question. After spending some time, a random search pointed out StreamExt::buffered().

Here is my current approach:

async fn run_tasks(tasks: Vec<Task>) {
  // Convert Vec<Task> to Vec<impl Future<Output=TaskResult>>
  let tasks = tasks.into_iter().map(|t| do_task(t));
  // Convert Vec<impl Future<Output=TaskResult>> to impl Stream<Item=TaskResult>
  let tasks = stream::iter(tasks);
  // Limit the number of tasks in the stream to consume at the same time.
  let tasks = tasks.buffered(N);
  while let Some(result) = tasks.next().await {
    // ...
  }
}

It worked but I guess that I’m wrong in the first place. Code is simple but I feel I’m doing something strange. I think it’s a common situation where we want to restrict the number of inflight tasks. I suspect that there is an idiomatic way to do that.