Author: Toshinori Notake [toshi.notake.43568@gmail.com]
Committer: GitHub [noreply@github.com] Sat, 31 Jul 2021 04:20:01 +0000
Hash: a01edf939cf136b64f91aa50b1adad751b28853a
Timestamp: Sat, 31 Jul 2021 04:20:01 +0000 (3 years ago)

+114 -14 +/-1 browse
Support multipart S3 uploads (#36)
Support multipart S3 uploads (#36)

Adds support for >5GB uploads. Each part is also retried if there is a transient S3 error.

Co-authored-by: Jason White <github@jasonwhite.io>
1diff --git a/src/storage/s3.rs b/src/storage/s3.rs
2index abc9f81..bae975b 100644
3--- a/src/storage/s3.rs
4+++ b/src/storage/s3.rs
5 @@ -20,20 +20,25 @@
6 use async_trait::async_trait;
7 use backoff::future::retry;
8 use backoff::ExponentialBackoff;
9- use bytes::Bytes;
10+ use bytes::{Bytes, BytesMut};
11 use derive_more::{Display, From};
12 use futures::{stream, stream::TryStreamExt};
13- use http::StatusCode;
14+ use http::{HeaderMap, StatusCode};
15+ use rusoto_core::request::BufferedHttpResponse;
16 use rusoto_core::{HttpClient, Region, RusotoError};
17 use rusoto_credential::{
18 AutoRefreshingProvider, DefaultCredentialsProvider, ProvideAwsCredentials,
19 };
20 use rusoto_s3::{
21- GetObjectError, GetObjectRequest, HeadBucketError, HeadBucketRequest,
22- HeadObjectError, HeadObjectRequest, PutObjectError, PutObjectRequest,
23- S3Client, StreamingBody, S3,
24+ CompleteMultipartUploadError, CompleteMultipartUploadRequest,
25+ CompletedMultipartUpload, CompletedPart, CreateMultipartUploadError,
26+ CreateMultipartUploadRequest, GetObjectError, GetObjectRequest,
27+ HeadBucketError, HeadBucketRequest, HeadObjectError, HeadObjectRequest,
28+ PutObjectError, PutObjectRequest, S3Client, StreamingBody, UploadPartError,
29+ UploadPartRequest, S3,
30 };
31 use rusoto_sts::WebIdentityProvider;
32+ use tokio::io::AsyncReadExt;
33
34 use super::{LFSObject, Storage, StorageKey, StorageStream};
35 use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
36 @@ -46,8 +51,13 @@ type BoxedCredentialProvider =
37 pub enum Error {
38 Get(RusotoError<GetObjectError>),
39 Put(RusotoError<PutObjectError>),
40+ CreateMultipart(RusotoError<CreateMultipartUploadError>),
41+ Upload(RusotoError<UploadPartError>),
42+ CompleteMultipart(RusotoError<CompleteMultipartUploadError>),
43 Head(RusotoError<HeadObjectError>),
44
45+ Stream(std::io::Error),
46+
47 /// Initialization error.
48 Init(InitError),
49
50 @@ -284,17 +294,107 @@ where
51 key: StorageKey,
52 value: LFSObject,
53 ) -> Result<(), Self::Error> {
54- let (len, stream) = value.into_parts();
55+ let (_len, stream) = value.into_parts();
56+
57+ let mu_response = retry(ExponentialBackoff::default(), || async {
58+ Ok(self
59+ .client
60+ .create_multipart_upload(CreateMultipartUploadRequest {
61+ bucket: self.bucket.clone(),
62+ key: self.key_to_path(&key),
63+ ..Default::default()
64+ })
65+ .await?)
66+ })
67+ .await?;
68
69- let request = PutObjectRequest {
70- bucket: self.bucket.clone(),
71- key: self.key_to_path(&key),
72- content_length: Some(len as i64),
73- body: Some(StreamingBody::new(stream)),
74- ..Default::default()
75- };
76+ // Okay to unwrap. This would only be None there is a bug in either
77+ // Rusoto or S3 itself.
78+ let upload_id = mu_response.upload_id.unwrap();
79+
80+ // 100 MB
81+ const CHUNK_SIZE: usize = 100 * 1024 * 1024;
82+
83+ let mut buffer = BytesMut::with_capacity(CHUNK_SIZE);
84+ let mut part_number = 1;
85+ let mut completed_parts = Vec::new();
86+ let mut streaming_body = StreamingBody::new(stream).into_async_read();
87+
88+ loop {
89+ let size = streaming_body.read_buf(&mut buffer).await?;
90+
91+ if buffer.len() < CHUNK_SIZE && size != 0 {
92+ continue;
93+ }
94+
95+ let chunk = buffer.split().freeze();
96+
97+ let up_response = retry(ExponentialBackoff::default(), || async {
98+ let chunk = chunk.clone();
99+ let chunk_len = chunk.len();
100+ let body =
101+ StreamingBody::new(Box::pin(stream::once(async move {
102+ Ok(chunk)
103+ })));
104+
105+ let req = UploadPartRequest {
106+ content_length: Some(chunk_len as i64),
107+ body: Some(body),
108+ bucket: self.bucket.clone(),
109+ key: self.key_to_path(&key),
110+ part_number,
111+ upload_id: upload_id.clone(),
112+ ..Default::default()
113+ };
114+ Ok(self.client.upload_part(req).await?)
115+ })
116+ .await?;
117+
118+ completed_parts.push(CompletedPart {
119+ e_tag: up_response.e_tag.clone(),
120+ part_number: Some(part_number),
121+ });
122+
123+ if size == 0 {
124+ // The stream has ended.
125+ break;
126+ } else {
127+ part_number += 1;
128+ }
129+ }
130+
131+ // Complete the upload.
132+ retry(ExponentialBackoff::default(), || async {
133+ let req = CompleteMultipartUploadRequest {
134+ bucket: self.bucket.clone(),
135+ key: self.key_to_path(&key),
136+ multipart_upload: Some(CompletedMultipartUpload {
137+ parts: Some(completed_parts.clone()),
138+ }),
139+ upload_id: upload_id.clone(),
140+ ..Default::default()
141+ };
142+
143+ let output = self.client.complete_multipart_upload(req).await?;
144+
145+ // Workaround: https://github.com/rusoto/rusoto/issues/1936
146+ // Rusoto may return `Ok` when there is a failure.
147+ if output.location == None
148+ && output.e_tag == None
149+ && output.bucket == None
150+ && output.key == None
151+ {
152+ return Err(RusotoError::Unknown(BufferedHttpResponse {
153+ status: StatusCode::from_u16(500).unwrap(),
154+ headers: HeaderMap::with_capacity(0),
155+ body: Bytes::from_static(b"HTTP 500 internal error"),
156+ })
157+ .into());
158+ }
159
160- self.client.put_object(request).await?;
161+ Ok(())
162+ })
163+ .await?;
164
165 Ok(())
166 }