Concurrent S3 Get Requests in Rust

Most applications hosted in the cloud interface with AWS S3 or an equivalent object store offered by other cloud providers. If you’re looking to get S3 objects using Rust and want to leverage its concurrency primitives to fetch objects in parallel, there is code below that could serve as a reference.

Assumes some familiarity with setting up a Rust project, adding crates as dependencies and building/running the binary. Also assumes you have S3 objects you can try this out with.

We’ll start with a simple S3 GET for a single object, extend it to fetch multiple objects and then finally get multiple objects from S3 concurrently.

Get a single object from S3

You can fetch an object from S3 given it’s bucket name and key using the Rust code below.

use anyhow::Result;
use aws_sdk_s3 as s3;

#[tokio::main]
async fn main() -> Result<()> {
    let config = aws_config::load_from_env().await;
    let client = &s3::Client::new(&config);

    let bucket_name = "your-s3-bucket-name";
    let key = "your/s3/object/key";

    let object = client
        .get_object()
        .bucket(bucket_name)
        .key(key)
        .send()
        .await;
    let body = object.unwrap().body.collect().await;
    let data_bytes = body.unwrap().into_bytes();

    // do something with the data
    println!("{:?}", data_bytes);

    Ok(())
}

If you’ve interacted with S3 in other languages, the Rust code above might look familiar. The steps are roughly: load AWS config, instantiate S3 client, send a get object request using the client and when it’s done, read the contents into memory as bytes. Once you have downloaded the contents of the S3 object as bytes, you can do what you need to with the data – write it to a file, convert it to a string, JSON decode it etc. If you’re dealing with a large S3 object and don’t want to load the entire object into memory, the S3 SDK provides ways for you to convert the data into a buffered stream as it’s being downloaded. This gives you the ability to read it in chunks – for example, read a large CSV line by line and process each line as you go.

The code above makes use of 4 Rust crates: the official AWS S3 SDK for the S3 operations, aws-config for loading the AWS credentials and region config, tokio for the async runtime and anyhow for coding convenience relating to function return type.

Add these crates to the [dependencies] section of your Cargo.toml file:

anyhow = "1.0"
aws-config = "0.51"
aws-sdk-s3 = "0.21"
tokio = { version = "1", features = ["full"] }

Feel free to adjust the version numbers as you see fit if there are newer versions you’d like to use.

Get multiple objects from S3

To get more than one object from S3, we can extend our single S3 object version like you would in other languages: declare your list of keys and loop over them to get the objects. The code would look like this:

use anyhow::Result;
use aws_sdk_s3 as s3;

#[tokio::main]
async fn main() -> Result<()> {
    let config = aws_config::load_from_env().await;
    let client = &s3::Client::new(&config);

    let bucket_name = "your-s3-bucket-name";
    let keys = ["your/s3/object/key", "another/s3/key", "yet/another/key"];

    for key in keys {
        let object = client
            .get_object()
            .bucket(bucket_name)
            .key(key)
            .send()
            .await;
        let body = object.unwrap().body.collect().await;
        let data_bytes = body.unwrap().into_bytes();

        // do something with the data
        println!("{:?}", data_bytes);
    }

    Ok(())
}

This is fetching the S3 objects one at a time in serial.

Get multiple objects from S3 concurrently in Rust

Rust has concurrency primitives that would let us make concurrent S3 get requests so we can reduce the total time it takes to get all the objects. The Rust S3 SDK already has async capabilities built into it (notice the await on the get_object in the code above). We can leverage this to kick off multiple get_object requests.

use anyhow::Result;
use aws_sdk_s3 as s3;
use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() -> Result<()> {
    let config = aws_config::load_from_env().await;
    let client = &s3::Client::new(&config);

    let bucket_name = "your-s3-bucket-name";
    let keys = [
        "your/key/prefix/38b9caca".to_string(),
        "your/key/prefix/50b00af8".to_string(),
        "your/key/prefix/654e21d9".to_string(),
        "your/key/prefix/68a041d6".to_string(),
        "your/key/prefix/a0e35420".to_string(),
        "your/key/prefix/cf021b87".to_string(),
        "your/key/prefix/ed8c0e70".to_string(),
    ];

    let s3_get_object_requests = stream::iter(&keys)
        .map(|key| async move {
            client
                .get_object()
                .bucket(bucket_name)
                .key(key)
                .send()
                .await
        })
        .buffer_unordered(10);

    let s3_objects = s3_get_object_requests
        .fold(Vec::new(), |mut accumulator, object| async move {
            let body = object.unwrap().body.collect().await;
            let data_bytes = body.unwrap().into_bytes();
            accumulator.push(data_bytes);
            accumulator
        })
        .await;

    println!("Finished downloading {} objects from S3", s3_objects.len());

    Ok(())
}

There are 2 key changes that were made to the serial for loop version:

First the list of keys we need to fetch is turned in a stream that we iterate over it to kick off get_object requests. In addition, we also make use of buffer_unordered(n) to have some control over how many concurrent requests are in flight at once. In this case with n set to 10, there could be up to 10 S3 gets happening at once. Without the buffering in place, we’d end up with as many concurrent S3 gets as there are keys and with a large key count, this could be problematic. Too many concurrent requests would lead to throttling by S3, which might result in an overall slower time to completion.

The second change is the use of fold to accumulate the results from the list of futures into a vector of bytes as the S3 get object requests complete.

The futures create is used for the stream processing capabilities. So you’ll need to add futures = "0.3" to the list of [dependencies] in your Cargo.toml.

Summary

By combining stream processing from the futures crate, async runtime provided by tokio and S3 get operation already implemented as an async action in the AWS S3 SDK, we are able to fetch multiple S3 objects concurrently.

Concurrent programming can feel tricky to get right and requires forethought to ensure you’re using the right primitives. The results are worth it as it can lead to considerable improvement in the total load time to perform multiple operations – in this case to get all the S3 objects.

Dec 2022

రామ్ . राम . Rām