Author: Zhixing Zhang [account@neoto.xin]
Committer: GitHub [noreply@github.com] Thu, 10 Jun 2021 23:40:13 +0000
Hash: c99124a19eac7007b521a079054bb0abc7c4a0cc
Timestamp: Thu, 10 Jun 2021 23:40:13 +0000 (3 years ago)

+235 -36 +/-10 browse
Use S3 signed URL for upload & allow specifying download domain (#27)
Use S3 signed URL for upload & allow specifying download domain (#27)

1diff --git a/.gitignore b/.gitignore
2index 9340db5..c16b681 100644
3--- a/.gitignore
4+++ b/.gitignore
5 @@ -3,3 +3,4 @@
6 /test.sh
7 /cache/
8 .env
9+ /.idea
10 diff --git a/src/app.rs b/src/app.rs
11index 481276b..c881d5e 100644
12--- a/src/app.rs
13+++ b/src/app.rs
14 @@ -35,6 +35,9 @@ use crate::error::Error;
15 use crate::hyperext::RequestExt;
16 use crate::lfs;
17 use crate::storage::{LFSObject, Namespace, Storage, StorageKey};
18+ use std::time::Duration;
19+
20+ const UPLOAD_EXPIRATION: Duration = Duration::from_secs(30 * 60);
21
22 async fn from_json<T>(mut body: Body) -> Result<T, Error>
23 where
24 @@ -261,8 +264,9 @@ where
25
26 let (namespace, _) = key.into_parts();
27 Ok(basic_response(
28- uri, object, operation, size, namespace,
29- ))
30+ uri, &storage, object, operation, size, namespace,
31+ )
32+ .await)
33 }
34 });
35
36 @@ -292,8 +296,9 @@ where
37 }
38 }
39
40- fn basic_response<E>(
41+ async fn basic_response<E, S>(
42 uri: Uri,
43+ storage: &S,
44 object: lfs::RequestObject,
45 op: lfs::Operation,
46 size: Result<Option<u64>, E>,
47 @@ -301,6 +306,7 @@ fn basic_response<E>(
48 ) -> lfs::ResponseObject
49 where
50 E: fmt::Display,
51+ S: Storage,
52 {
53 if let Ok(Some(size)) = size {
54 // Ensure that the client and server agree on the size of the object.
55 @@ -345,21 +351,13 @@ where
56 }
57 };
58
59- let href = format!("{}api/{}/object/{}", uri, namespace, object.oid);
60-
61- let action = lfs::Action {
62- href,
63- header: None,
64- expires_in: None,
65- expires_at: None,
66- };
67-
68 match op {
69 lfs::Operation::Upload => {
70 // If the object does exist, then we should not return any action.
71 //
72 // If the object does not exist, then we should return an upload
73 // action.
74+ let upload_expiry_secs = UPLOAD_EXPIRATION.as_secs() as i32;
75 match size {
76 Some(size) => lfs::ResponseObject {
77 oid: object.oid,
78 @@ -375,7 +373,26 @@ where
79 authenticated: Some(true),
80 actions: Some(lfs::Actions {
81 download: None,
82- upload: Some(action),
83+ upload: Some(lfs::Action {
84+ href: storage
85+ .upload_url(
86+ &StorageKey::new(
87+ namespace.clone(),
88+ object.oid,
89+ ),
90+ UPLOAD_EXPIRATION,
91+ )
92+ .await
93+ .unwrap_or_else(|| {
94+ format!(
95+ "{}api/{}/object/{}",
96+ uri, namespace, object.oid
97+ )
98+ }),
99+ header: None,
100+ expires_in: Some(upload_expiry_secs),
101+ expires_at: None,
102+ }),
103 verify: Some(lfs::Action {
104 href: format!(
105 "{}api/{}/objects/verify",
106 @@ -399,7 +416,22 @@ where
107 error: None,
108 authenticated: Some(true),
109 actions: Some(lfs::Actions {
110- download: Some(action),
111+ download: Some(lfs::Action {
112+ href: storage
113+ .public_url(&StorageKey::new(
114+ namespace.clone(),
115+ object.oid,
116+ ))
117+ .unwrap_or_else(|| {
118+ format!(
119+ "{}api/{}/object/{}",
120+ uri, namespace, object.oid
121+ )
122+ }),
123+ header: None,
124+ expires_in: None,
125+ expires_at: None,
126+ }),
127 upload: None,
128 verify: None,
129 }),
130 diff --git a/src/main.rs b/src/main.rs
131index a67626e..1975421 100644
132--- a/src/main.rs
133+++ b/src/main.rs
134 @@ -71,13 +71,15 @@ enum Backend {
135
136 #[derive(StructOpt)]
137 struct GlobalArgs {
138- /// Host or address to listen on.
139- #[structopt(
140- long = "host",
141- default_value = "0.0.0.0:8080",
142- env = "RUDOLFS_HOST"
143- )]
144- host: String,
145+ /// The host or address to listen on. If this is not specified, then
146+ /// `0.0.0.0` is used where the port can be specified with `--port`
147+ /// (port 8080 is used by default if that is also not specified).
148+ #[structopt(long = "host", env = "RUDOLFS_HOST")]
149+ host: Option<String>,
150+
151+ /// The port to bind to. This is only used if `--host` is not specified.
152+ #[structopt(long = "port", default_value = "8080", env = "PORT")]
153+ port: u16,
154
155 /// Encryption key to use.
156 #[structopt(
157 @@ -119,6 +121,11 @@ struct S3Args {
158 /// Amazon S3 path prefix to use.
159 #[structopt(long, default_value = "lfs", env = "RUDOLFS_S3_PREFIX")]
160 prefix: String,
161+
162+ /// The base URL of your CDN. If specified, then all download URLs will be
163+ /// prefixed with this URL.
164+ #[structopt(long = "cdn", env = "RUDOLFS_S3_CDN")]
165+ cdn: Option<String>,
166 }
167
168 #[derive(StructOpt)]
169 @@ -144,12 +151,13 @@ impl Args {
170 logger_builder.init();
171
172 // Find a socket address to bind to. This will resolve domain names.
173- let addr = self
174- .global
175- .host
176- .to_socket_addrs()?
177- .next()
178- .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 8080)));
179+ let addr = match self.global.host {
180+ Some(ref host) => host
181+ .to_socket_addrs()?
182+ .next()
183+ .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 8080))),
184+ None => SocketAddr::from(([0, 0, 0, 0], self.global.port)),
185+ };
186
187 log::info!("Initializing storage...");
188
189 @@ -168,7 +176,7 @@ impl S3Args {
190 addr: SocketAddr,
191 global_args: GlobalArgs,
192 ) -> Result<(), Box<dyn std::error::Error>> {
193- let s3 = S3::new(self.bucket, self.prefix)
194+ let s3 = S3::new(self.bucket, self.prefix, self.cdn)
195 .map_err(Error::from)
196 .await?;
197
198 diff --git a/src/storage/cached.rs b/src/storage/cached.rs
199index a8d78e6..f7c1a4d 100644
200--- a/src/storage/cached.rs
201+++ b/src/storage/cached.rs
202 @@ -20,6 +20,7 @@
203 use std::fmt;
204 use std::io;
205 use std::sync::Arc;
206+ use std::time::Duration;
207
208 use async_trait::async_trait;
209 use bytes::Bytes;
210 @@ -387,4 +388,16 @@ where
211 Some(self.max_size)
212 }
213 }
214+
215+ fn public_url(&self, key: &StorageKey) -> Option<String> {
216+ self.storage.public_url(key)
217+ }
218+
219+ async fn upload_url(
220+ &self,
221+ key: &StorageKey,
222+ expires_in: Duration,
223+ ) -> Option<String> {
224+ self.storage.upload_url(key, expires_in).await
225+ }
226 }
227 diff --git a/src/storage/disk.rs b/src/storage/disk.rs
228index 1dc6a45..86dd977 100644
229--- a/src/storage/disk.rs
230+++ b/src/storage/disk.rs
231 @@ -22,6 +22,7 @@ use std::fs::Metadata;
232 use std::io;
233 use std::path::PathBuf;
234 use std::str::FromStr;
235+ use std::time::Duration;
236
237 use async_stream::try_stream;
238 use async_trait::async_trait;
239 @@ -248,6 +249,18 @@ impl Storage for Backend {
240 }
241 }))
242 }
243+
244+ fn public_url(&self, _key: &StorageKey) -> Option<String> {
245+ None
246+ }
247+
248+ async fn upload_url(
249+ &self,
250+ _key: &StorageKey,
251+ _expires_in: Duration,
252+ ) -> Option<String> {
253+ None
254+ }
255 }
256
257 /// A simple bytes codec that keeps track of its length.
258 diff --git a/src/storage/encrypt.rs b/src/storage/encrypt.rs
259index 71dd0d5..6d6d87a 100644
260--- a/src/storage/encrypt.rs
261+++ b/src/storage/encrypt.rs
262 @@ -19,6 +19,7 @@
263 // SOFTWARE.
264 use chacha::{ChaCha, KeyStream};
265 use std::io;
266+ use std::time::Duration;
267
268 use async_trait::async_trait;
269 use bytes::{Bytes, BytesMut};
270 @@ -121,4 +122,16 @@ where
271 async fn max_size(&self) -> Option<u64> {
272 self.storage.max_size().await
273 }
274+
275+ fn public_url(&self, key: &StorageKey) -> Option<String> {
276+ self.storage.public_url(key)
277+ }
278+
279+ async fn upload_url(
280+ &self,
281+ key: &StorageKey,
282+ expires_in: Duration,
283+ ) -> Option<String> {
284+ self.storage.upload_url(key, expires_in).await
285+ }
286 }
287 diff --git a/src/storage/mod.rs b/src/storage/mod.rs
288index 27039d7..9a3ea42 100644
289--- a/src/storage/mod.rs
290+++ b/src/storage/mod.rs
291 @@ -41,6 +41,7 @@ use std::fmt;
292 use std::io;
293 use std::pin::Pin;
294 use std::sync::Arc;
295+ use std::time::Duration;
296
297 use async_trait::async_trait;
298 use bytes::Bytes;
299 @@ -224,6 +225,16 @@ pub trait Storage {
300 async fn max_size(&self) -> Option<u64> {
301 None
302 }
303+
304+ /// Returns a publicly accessible URL
305+ fn public_url(&self, key: &StorageKey) -> Option<String>;
306+
307+ /// Returns a signed URL
308+ async fn upload_url(
309+ &self,
310+ key: &StorageKey,
311+ expires_in: Duration,
312+ ) -> Option<String>;
313 }
314
315 #[async_trait]
316 @@ -274,4 +285,16 @@ where
317 async fn max_size(&self) -> Option<u64> {
318 self.as_ref().max_size().await
319 }
320+
321+ fn public_url(&self, key: &StorageKey) -> Option<String> {
322+ self.as_ref().public_url(key)
323+ }
324+
325+ async fn upload_url(
326+ &self,
327+ key: &StorageKey,
328+ expires_in: Duration,
329+ ) -> Option<String> {
330+ self.as_ref().upload_url(key, expires_in).await
331+ }
332 }
333 diff --git a/src/storage/retrying.rs b/src/storage/retrying.rs
334index 246c28c..6d3032a 100644
335--- a/src/storage/retrying.rs
336+++ b/src/storage/retrying.rs
337 @@ -18,6 +18,7 @@
338 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
339 // SOFTWARE.
340 use std::sync::Arc;
341+ use std::time::Duration;
342
343 use async_trait::async_trait;
344 use backoff::future::retry;
345 @@ -90,4 +91,16 @@ where
346 async fn max_size(&self) -> Option<u64> {
347 self.storage.max_size().await
348 }
349+
350+ fn public_url(&self, key: &StorageKey) -> Option<String> {
351+ self.storage.public_url(key)
352+ }
353+
354+ async fn upload_url(
355+ &self,
356+ key: &StorageKey,
357+ expires_in: Duration,
358+ ) -> Option<String> {
359+ self.storage.upload_url(key, expires_in).await
360+ }
361 }
362 diff --git a/src/storage/s3.rs b/src/storage/s3.rs
363index de1b05b..bcafa75 100644
364--- a/src/storage/s3.rs
365+++ b/src/storage/s3.rs
366 @@ -25,7 +25,9 @@ use derive_more::{Display, From};
367 use futures::{stream, stream::TryStreamExt};
368 use http::StatusCode;
369 use rusoto_core::{HttpClient, Region, RusotoError};
370- use rusoto_credential::{AutoRefreshingProvider, ProvideAwsCredentials};
371+ use rusoto_credential::{
372+ AutoRefreshingProvider, DefaultCredentialsProvider, ProvideAwsCredentials,
373+ };
374 use rusoto_s3::{
375 GetObjectError, GetObjectRequest, HeadBucketError, HeadBucketRequest,
376 HeadObjectError, HeadObjectRequest, PutObjectError, PutObjectRequest,
377 @@ -34,6 +36,8 @@ use rusoto_s3::{
378 use rusoto_sts::WebIdentityProvider;
379
380 use super::{LFSObject, Storage, StorageKey, StorageStream};
381+ use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
382+ use std::time::Duration;
383
384 #[derive(Debug, From, Display)]
385 pub enum Error {
386 @@ -109,17 +113,26 @@ pub struct Backend<C = S3Client> {
387 /// S3 client.
388 client: C,
389
390+ // Aws Credentials. Used for signing URLs.
391+ credential_provider: Box<dyn ProvideAwsCredentials + Send + Sync + 'static>,
392+
393 /// Name of the bucket to use.
394 bucket: String,
395
396 /// Prefix for objects.
397 prefix: String,
398+
399+ /// URL for the CDN. Example: https://lfscdn.myawesomegit.com
400+ cdn: Option<String>,
401+
402+ region: Region,
403 }
404
405 impl Backend {
406 pub async fn new(
407 bucket: String,
408 mut prefix: String,
409+ cdn: Option<String>,
410 ) -> Result<Self, Error> {
411 // Ensure the prefix doesn't end with a '/'.
412 while prefix.ends_with('/') {
413 @@ -154,18 +167,34 @@ impl Backend {
414
415 // Check if there is any k8s credential provider. If there is, use it.
416 let k8s_provider = WebIdentityProvider::from_k8s_env();
417- let client = if k8s_provider.credentials().await.is_ok() {
418+
419+ let (client, credential_provider): (
420+ S3Client,
421+ Box<dyn ProvideAwsCredentials + Send + Sync + 'static>,
422+ ) = if k8s_provider.credentials().await.is_ok() {
423 log::info!("Using credentials from Kubernetes");
424- S3Client::new_with(
425+ let provider = AutoRefreshingProvider::new(k8s_provider)?;
426+ let client = S3Client::new_with(
427 HttpClient::new()?,
428- AutoRefreshingProvider::new(k8s_provider)?,
429- region,
430- )
431+ provider.clone(),
432+ region.clone(),
433+ );
434+ (client, Box::new(provider))
435 } else {
436- S3Client::new(region)
437+ let client = S3Client::new(region.clone());
438+ let provider = DefaultCredentialsProvider::new()?;
439+ (client, Box::new(provider))
440 };
441
442- Backend::with_client(client, bucket, prefix).await
443+ Backend::with_client(
444+ client,
445+ bucket,
446+ prefix,
447+ cdn,
448+ region,
449+ credential_provider,
450+ )
451+ .await
452 }
453 }
454
455 @@ -174,6 +203,11 @@ impl<C> Backend<C> {
456 client: C,
457 bucket: String,
458 prefix: String,
459+ cdn: Option<String>,
460+ region: Region,
461+ credential_provider: Box<
462+ dyn ProvideAwsCredentials + Send + Sync + 'static,
463+ >,
464 ) -> Result<Self, Error>
465 where
466 C: S3 + Clone,
467 @@ -207,6 +241,9 @@ impl<C> Backend<C> {
468 client,
469 bucket,
470 prefix,
471+ cdn,
472+ region,
473+ credential_provider,
474 })
475 }
476
477 @@ -297,4 +334,37 @@ where
478 fn list(&self) -> StorageStream<(StorageKey, u64), Self::Error> {
479 Box::pin(stream::empty())
480 }
481+
482+ fn public_url(&self, key: &StorageKey) -> Option<String> {
483+ if let Some(cdn) = self.cdn.as_ref() {
484+ Some(format!("{}/{}", cdn, self.key_to_path(key)))
485+ } else {
486+ None
487+ }
488+ }
489+
490+ async fn upload_url(
491+ &self,
492+ key: &StorageKey,
493+ expires_in: Duration,
494+ ) -> Option<String> {
495+ let request = PutObjectRequest {
496+ bucket: self.bucket.clone(),
497+ key: self.key_to_path(&key),
498+ ..Default::default()
499+ };
500+ let credentials = self.credential_provider.credentials().await;
501+ let credentials = match credentials {
502+ Ok(credentials) => credentials,
503+ Err(_) => {
504+ return None;
505+ }
506+ };
507+ let presigned_url = request.get_presigned_url(
508+ &self.region,
509+ &credentials,
510+ &PreSignedRequestOption { expires_in },
511+ );
512+ Some(presigned_url)
513+ }
514 }
515 diff --git a/src/storage/verify.rs b/src/storage/verify.rs
516index 4f4e85c..826e801 100644
517--- a/src/storage/verify.rs
518+++ b/src/storage/verify.rs
519 @@ -19,6 +19,7 @@
520 // SOFTWARE.
521 use std::io;
522 use std::sync::Arc;
523+ use std::time::Duration;
524
525 use async_trait::async_trait;
526 use derive_more::{Display, From};
527 @@ -160,4 +161,16 @@ where
528 async fn max_size(&self) -> Option<u64> {
529 self.storage.max_size().await
530 }
531+
532+ fn public_url(&self, key: &StorageKey) -> Option<String> {
533+ self.storage.public_url(key)
534+ }
535+
536+ async fn upload_url(
537+ &self,
538+ key: &StorageKey,
539+ expires_in: Duration,
540+ ) -> Option<String> {
541+ self.storage.upload_url(key, expires_in).await
542+ }
543 }