How can I upload file to s3 using rusoto, without reading file content to memory (streamed)?
With this code:
use std::fs::File;
use std::io::BufReader;
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};
fn main() {
    let file = File::open("input.txt").unwrap();
    let mut reader = BufReader::new(file);
    let s3_client = S3Client::new(Region::UsEast1);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("example_bucket"),
        key: "example_filename".to_string(),
//        this works:
//      body: Some("example string".to_owned().into_bytes().into()),
//        this doesn't:
        body: Some(StreamingBody::new(reader)),
        ..Default::default()
    }).sync().expect("could not upload");
}
I receive the following error:
error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied --> src/bin/example.rs:18:20 | 18 | body: Some(StreamingBody::new(reader)), | ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>` | = note: required by `rusoto_core::stream::ByteStream::new`
Okay. Strap yourself in, this is a fun one.
StreamingBody is an alias for ByteStream, which itself takes a parameter type S: Stream<Item = Bytes, Error = Error> + Send + 'static. In short, it needs to be a stream of bytes.
BufReader, evidently, does not implement this trait, as it predates futures and streams by a long while. There is also no easy conversion to Stream<Item = Bytes> that you can use to implicitly convert into this.
The reason the first (commented) example works is because String::into_bytes().into() will follow the typecast chain: String -> Vec<u8> -> ByteStream thanks to the implementation of From<Vec<u8>> on ByteStream.
Now that we know why this doesn't work, we can fix it. There is a fast way, and then there is a right way. I'll show you both.
The fast (but not optimal) way is simply to call File::read_to_end(). This will fill up a Vec<u8>, which you can then use like you did before:
 let mut buf:Vec<u8> = vec![];
 file.read_to_end(&mut buf)?;
 // buf now contains the entire file
This is inefficient and suboptimal for two reasons:
read_to_end() is a blocking call. Based on where you are reading the file from, this blocking time may prove unreasonableVec definition + some extra we don't really care about)The good way turns your file into a structure implementing AsyncRead. From this, we can then form a Stream.
Since you already have a std::fs::File, we will first convert it into a tokio::fs::File. This implements AsyncRead, which is very important for later:
let tokio_file = tokio::fs::File::from_std(file);
From this, we sadly need to do some pipework to get it into a Stream. Multiple crates have implemented it; the way to do so from scratch is the following:
use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
   .map(|r| r.as_ref().to_vec());
byte_stream is an instance of tokio_util::codec::FramedRead which implements Stream with a specific item based on our decoder. As our decoder is BytesCodec, your stream is therefore Stream<Item = BytesMut>.
As the playground doesn't know rusoto_core, I cannot show you the full flow. I can, however, show you that you can generate a Stream<Item = Vec<u8>, Error = io::Error>, which is the crux of this: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133
Here's a version with the upcoming Rusoto async-await syntax (for getObject although should be straightforward to tweak for upload)... possibly out for public consumption in Rusoto 0.4.3:
https://github.com/brainstorm/rusoto-s3-async-await
Namely:
pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
    let get_req = GetObjectRequest {
        bucket,
        key: object,
        ..Default::default()
    };
    let result = client
        .get_object(get_req)
        .await
        .expect("Couldn't GET object");
    println!("get object result: {:#?}", result);
    let stream = result.body.unwrap();
    let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();
    assert!(body.len() > 0);
    dbg!(body);
}
Which is essentially borrowed from the integration testsuite itself, where you can find snippets of the upload version too.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With