Commit
Author: Jason White [jwhite@esri.com]
Hash: 1ccd8a5e83fb9b53b2699db25edbb10db812d7c3
Timestamp: Wed, 13 Mar 2019 20:23:11 +0000 (6 years ago)

+333 -166 +/-10 browse
Add namespace support
Add namespace support

Fixes #1.
1diff --git a/Cargo.toml b/Cargo.toml
2index a9ec405..07da86b 100644
3--- a/Cargo.toml
4+++ b/Cargo.toml
5 @@ -1,7 +1,7 @@
6 [package]
7 name = "rudolfs"
8 version = "0.1.0"
9- authors = ["Jason White"]
10+ authors = ["Jason White <jasonaw0@gmail.com>"]
11 edition = "2018"
12 description = """
13 A high-performance, caching Git LFS server with an AWS S3 back-end.
14 diff --git a/README.md b/README.md
15index 524a0a4..f0b4d23 100644
16--- a/README.md
17+++ b/README.md
18 @@ -15,6 +15,10 @@ A high-performance, caching Git LFS server with an AWS S3 back-end.
19
20 - Encryption of LFS objects in both the cache and in permanent storage.
21
22+ - Separation of GitHub organizations and projects. Just specify the org and
23+ project names in the URL and they are automatically created. If two projects
24+ share many LFS objects, have them use the same URL to save on storage space.
25+
26 ## Running It
27
28 ### Generate an encryption key
29 @@ -118,7 +122,7 @@ Add `.lfsconfig` to the root of your Git repository:
30
31 ```
32 [lfs]
33- url = "http://gitlfs.example.com:8080/"
34+ url = "http://gitlfs.example.com:8080/api/my-org/my-project"
35 ```
36
37 Optionally, I also recommend changing these global settings to speed things up:
38 diff --git a/src/app.rs b/src/app.rs
39index d654ece..8d4efc1 100644
40--- a/src/app.rs
41+++ b/src/app.rs
42 @@ -34,7 +34,7 @@ use hyper::{self, service::Service, Chunk, Method, Request, Response};
43 use crate::error::Error;
44 use crate::hyperext::{into_request, Body, IntoResponse, RequestExt};
45 use crate::lfs;
46- use crate::storage::{LFSObject, Storage};
47+ use crate::storage::{LFSObject, Namespace, Storage, StorageKey};
48
49 /// Shared state for all instances of the `App` service.
50 pub struct State<S> {
51 @@ -63,13 +63,83 @@ where
52 App { state }
53 }
54
55+ /// Generates a "404 not found" response.
56+ fn not_found(
57+ &mut self,
58+ _req: Request<Body>,
59+ ) -> Result<Response<Body>, Error> {
60+ Ok(Response::builder()
61+ .status(StatusCode::NOT_FOUND)
62+ .body("Not found".into())?)
63+ }
64+
65+ /// Handles `/api` routes.
66+ fn api(
67+ &mut self,
68+ req: Request<Body>,
69+ ) -> impl Future<Item = Response<Body>, Error = Error> {
70+ let mut parts = req.uri().path().split('/').filter(|s| !s.is_empty());
71+
72+ // Skip over the '/api' part.
73+ assert_eq!(parts.next(), Some("api"));
74+
75+ // Extract the namespace.
76+ let namespace = match (parts.next(), parts.next()) {
77+ (Some(org), Some(project)) => {
78+ Namespace::new(org.into(), project.into())
79+ }
80+ _ => {
81+ return Response::builder()
82+ .status(StatusCode::BAD_REQUEST)
83+ .body(Body::from("Missing org/project in URL"))
84+ .map_err(Into::into)
85+ .response();
86+ }
87+ };
88+
89+ match parts.next() {
90+ Some("object") => {
91+ // Upload or download a single object.
92+ let oid = parts.next().and_then(|x| x.parse::<lfs::Oid>().ok());
93+ let oid = match oid {
94+ Some(oid) => oid,
95+ None => {
96+ return Response::builder()
97+ .status(StatusCode::BAD_REQUEST)
98+ .body(Body::from("Missing OID parameter."))
99+ .map_err(Into::into)
100+ .response();
101+ }
102+ };
103+
104+ let key = StorageKey::new(oid).with_namespace(namespace);
105+
106+ match *req.method() {
107+ Method::GET => self.download(req, key).response(),
108+ Method::PUT => self.upload(req, key).response(),
109+ _ => self.not_found(req).response(),
110+ }
111+ }
112+ Some("objects") => match (req.method(), parts.next()) {
113+ (&Method::POST, Some("batch")) => {
114+ self.batch(req, namespace).response()
115+ }
116+ (&Method::POST, Some("verify")) => {
117+ self.verify(req, namespace).response()
118+ }
119+ _ => self.not_found(req).response(),
120+ },
121+ _ => self.not_found(req).response(),
122+ }
123+ }
124+
125 /// Downloads a single LFS object.
126 fn download(
127 &mut self,
128 _req: Request<Body>,
129- oid: lfs::Oid,
130+ key: StorageKey,
131 ) -> impl Future<Item = Response<Body>, Error = Error> {
132- self.state.storage.get(&oid).from_err::<Error>().and_then(
133+ self.state.storage.get(&key).from_err::<Error>().and_then(
134 move |object| -> Result<_, Error> {
135 if let Some(object) = object {
136 return Response::builder()
137 @@ -95,7 +165,7 @@ where
138 fn upload(
139 &mut self,
140 req: Request<Body>,
141- oid: lfs::Oid,
142+ key: StorageKey,
143 ) -> <Self as Service>::Future {
144 let len = req
145 .headers()
146 @@ -128,7 +198,7 @@ where
147 Box::new(
148 self.state
149 .storage
150- .put(&oid, object)
151+ .put(key, object)
152 .from_err::<Error>()
153 .and_then(|_| {
154 Response::builder()
155 @@ -143,13 +213,16 @@ where
156 fn verify(
157 &mut self,
158 req: Request<Body>,
159+ namespace: Namespace,
160 ) -> impl Future<Item = Response<Body>, Error = Error> {
161 let state = self.state.clone();
162
163 req.into_body()
164 .into_json()
165 .and_then(move |val: lfs::VerifyRequest| {
166- state.storage.size(&val.oid).from_err::<Error>().and_then(
167+ let key = StorageKey::new(val.oid).with_namespace(namespace);
168+
169+ state.storage.size(&key).from_err::<Error>().and_then(
170 move |size| {
171 if let Some(size) = size {
172 if size == val.size {
173 @@ -170,16 +243,6 @@ where
174 })
175 }
176
177- /// Generates a "404 not found" response.
178- fn not_found(
179- &mut self,
180- _req: Request<Body>,
181- ) -> Result<Response<Body>, Error> {
182- Ok(Response::builder()
183- .status(StatusCode::NOT_FOUND)
184- .body("Not found".into())?)
185- }
186-
187 /// Batch API endpoint for the Git LFS server spec.
188 ///
189 /// See also:
190 @@ -187,6 +250,7 @@ where
191 fn batch(
192 &mut self,
193 req: Request<Body>,
194+ namespace: Namespace,
195 ) -> impl Future<Item = Response<Body>, Error = Error> {
196 // Get the host name and scheme.
197 let uri = Uri::builder()
198 @@ -213,13 +277,16 @@ where
199 val.objects.into_iter().map(move |object| {
200 let uri = uri.clone();
201
202- state.storage.size(&object.oid).map(
203- move |size| {
204- basic_response(
205- uri, object, operation, size,
206- )
207- },
208- )
209+ let key = StorageKey::new(object.oid)
210+ .with_namespace(namespace.clone());
211+
212+ let namespace = namespace.clone();
213+
214+ state.storage.size(&key).map(move |size| {
215+ basic_response(
216+ uri, object, operation, size, namespace,
217+ )
218+ })
219 });
220
221 Either::A(
222 @@ -268,6 +335,7 @@ fn basic_response(
223 object: lfs::RequestObject,
224 op: lfs::Operation,
225 size: Option<u64>,
226+ namespace: Namespace,
227 ) -> lfs::ResponseObject {
228 if let Some(size) = size {
229 // Ensure that the client and server agree on the size of the object.
230 @@ -288,7 +356,7 @@ fn basic_response(
231 }
232 }
233
234- let href = format!("{}object/{}", uri, object.oid);
235+ let href = format!("{}api/{}/object/{}", uri, namespace, object.oid);
236
237 let action = lfs::Action {
238 href,
239 @@ -320,7 +388,10 @@ fn basic_response(
240 download: None,
241 upload: Some(action.clone()),
242 verify: Some(lfs::Action {
243- href: format!("{}objects/verify", uri),
244+ href: format!(
245+ "{}api/{}/objects/verify",
246+ uri, namespace
247+ ),
248 header: None,
249 expires_in: None,
250 expires_at: None,
251 @@ -373,51 +444,8 @@ where
252 fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
253 let req = into_request(req);
254
255- if req.uri().path().starts_with("/object/") {
256- // Extract the OID from the request.
257- let mut parts =
258- req.uri().path().split('/').filter(|s| !s.is_empty());
259-
260- // Skip over the 'object/' part.
261- parts.next();
262-
263- let oid = match parts.next() {
264- Some(oid) => oid,
265- None => {
266- return Response::builder()
267- .status(StatusCode::BAD_REQUEST)
268- .body(Body::from("Missing OID parameter."))
269- .map_err(Into::into)
270- .response();
271- }
272- };
273-
274- let oid = match oid.parse::<lfs::Oid>() {
275- Ok(oid) => oid,
276- Err(err) => {
277- return Response::builder()
278- .status(StatusCode::BAD_REQUEST)
279- .body(Body::from(format!("Invalid OID: {}", err)))
280- .map_err(Into::into)
281- .response();
282- }
283- };
284-
285- match *req.method() {
286- Method::GET => return self.download(req, oid).response(),
287- Method::PUT => return self.upload(req, oid),
288- _ => return self.not_found(req).response(),
289- }
290- }
291-
292- match (req.method(), req.uri().path()) {
293- (&Method::POST, "/objects/batch") => {
294- return self.batch(req).response();
295- }
296- (&Method::POST, "/objects/verify") => {
297- return self.verify(req).response();
298- }
299- _ => {}
300+ if req.uri().path().starts_with("/api/") {
301+ return self.api(req).response();
302 }
303
304 Response::builder()
305 diff --git a/src/main.rs b/src/main.rs
306index f1b4754..2149586 100644
307--- a/src/main.rs
308+++ b/src/main.rs
309 @@ -61,6 +61,10 @@ struct Args {
310 #[structopt(long = "s3-bucket")]
311 s3_bucket: String,
312
313+ /// Amazon S3 path prefix to use.
314+ #[structopt(long = "s3-prefix", default_value = "lfs")]
315+ s3_prefix: String,
316+
317 /// Encryption key to use.
318 #[structopt(long = "key", parse(try_from_str = "FromHex::from_hex"))]
319 key: [u8; 32],
320 @@ -94,7 +98,7 @@ impl Args {
321
322 // Initialize our storage backends.
323 let disk = Disk::new(self.cache_dir).map_err(Error::from);
324- let s3 = S3::new(self.s3_bucket, "lfs".into()).map_err(Error::from);
325+ let s3 = S3::new(self.s3_bucket, self.s3_prefix).map_err(Error::from);
326 let storage = disk
327 .join(s3)
328 .and_then(move |(disk, s3)| {
329 diff --git a/src/storage/cached.rs b/src/storage/cached.rs
330index 81012b6..e4e1a64 100644
331--- a/src/storage/cached.rs
332+++ b/src/storage/cached.rs
333 @@ -29,7 +29,7 @@ use tokio;
334 use crate::lfs::Oid;
335 use crate::lru::Cache;
336
337- use super::{LFSObject, Storage, StorageFuture, StorageStream};
338+ use super::{LFSObject, Storage, StorageFuture, StorageKey, StorageStream};
339
340 #[derive(Debug)]
341 pub enum Error<C, S> {
342 @@ -113,9 +113,6 @@ where
343
344 /// Returns a future that prunes the least recently used entries that cause the
345 /// storage to exceed the given maximum size.
346- ///
347- /// If the stream is thrown away, then the items will be removed from the
348- /// in-memory LRU cache, but not physically deleted.
349 fn prune_cache<S>(
350 lru: &mut Cache,
351 max_size: u64,
352 @@ -138,13 +135,14 @@ where
353 }
354
355 Either::B(stream::iter_ok(to_delete).fold(0, move |acc, oid| {
356- storage.delete(&oid).map(move |()| acc + 1)
357+ let key = StorageKey::new(oid);
358+ storage.delete(&key).map(move |()| acc + 1)
359 }))
360 }
361
362 fn cache_and_prune<C>(
363 cache: Arc<C>,
364- key: Oid,
365+ oid: Oid,
366 obj: LFSObject,
367 lru: Arc<Mutex<Cache>>,
368 max_size: u64,
369 @@ -155,12 +153,12 @@ where
370 let len = obj.len();
371
372 cache
373- .put(&key, obj)
374+ .put(StorageKey::new(oid), obj)
375 .and_then(move |()| {
376 // Add the object info to our LRU cache once the download from
377 // permanent storage is complete.
378 let mut lru = lru.lock().unwrap();
379- lru.push(key, len);
380+ lru.push(oid, len);
381
382 // Prune the cache.
383 prune_cache(&mut lru, max_size, cache).map(move |count| {
384 @@ -170,7 +168,7 @@ where
385 })
386 })
387 .map_err(move |err| {
388- log::error!("Error caching {} ({})", key, err);
389+ log::error!("Error caching {} ({})", oid, err);
390 })
391 }
392
393 @@ -185,21 +183,21 @@ where
394
395 /// Tries to query the cache first. If that fails, falls back to the
396 /// permanent storage backend.
397- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error> {
398+ fn get(
399+ &self,
400+ key: &StorageKey,
401+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
402 // TODO: Keep stats on cache hits and misses. We can then display those
403 // stats on a web page or send them to another service such as
404 // Prometheus.
405- let key = *key;
406-
407 if let Ok(mut lru) = self.lru.lock() {
408- if lru.get_refresh(&key).is_some() {
409+ if lru.get_refresh(key.oid()).is_some() {
410 // Cache hit!
411- //
412- // TODO: Verify the stream as we send it back. If the SHA256 is
413- // incorrect, delete it and let the client try again.
414 let storage = self.storage.clone();
415 let lru2 = self.lru.clone();
416
417+ let key = StorageKey::new(*key.oid());
418+
419 return Box::new(
420 self.cache.get(&key).map_err(Error::from_cache).and_then(
421 move |obj| match obj {
422 @@ -209,7 +207,7 @@ where
423 // the entry from our LRU. This can happen if
424 // the cache is cleared out manually.
425 let mut lru = lru2.lock().unwrap();
426- lru.remove(&key);
427+ lru.remove(key.oid());
428
429 // Fall back to permanent storage. Note that
430 // this won't actually cache the object. This
431 @@ -232,6 +230,7 @@ where
432 let lru = self.lru.clone();
433 let max_size = self.max_size;
434 let cache = self.cache.clone();
435+ let oid = *key.oid();
436
437 Box::new(
438 self.storage
439 @@ -248,7 +247,7 @@ where
440 // out of disk space, the server should still continue
441 // operating.
442 tokio::spawn(cache_and_prune(
443- cache, key, b, lru, max_size,
444+ cache, oid, b, lru, max_size,
445 ));
446
447 // Send the object from permanent-storage.
448 @@ -268,10 +267,9 @@ where
449
450 fn put(
451 &self,
452- key: &Oid,
453+ key: StorageKey,
454 value: LFSObject,
455 ) -> StorageFuture<(), Self::Error> {
456- let key = *key;
457 let lru = self.lru.clone();
458 let max_size = self.max_size;
459 let cache = self.cache.clone();
460 @@ -282,16 +280,19 @@ where
461 // shouldn't prevent the client from uploading the LFS object to
462 // permanent storage. For example, even if we run out of disk space, the
463 // server should still continue operating.
464- tokio::spawn(cache_and_prune(cache, key, b, lru, max_size));
465+ tokio::spawn(cache_and_prune(cache, *key.oid(), b, lru, max_size));
466
467- Box::new(self.storage.put(&key, a).map_err(Error::from_storage))
468+ Box::new(self.storage.put(key, a).map_err(Error::from_storage))
469 }
470
471- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error> {
472+ fn size(
473+ &self,
474+ key: &StorageKey,
475+ ) -> StorageFuture<Option<u64>, Self::Error> {
476 // Get just the size of an object without perturbing the LRU ordering.
477 // Only downloads or uploads need to perturb the LRU ordering.
478 let lru = self.lru.lock().unwrap();
479- if let Some(size) = lru.get(key) {
480+ if let Some(size) = lru.get(key.oid()) {
481 // Cache hit!
482 Box::new(future::ok(Some(size)))
483 } else {
484 @@ -301,15 +302,16 @@ where
485 }
486
487 /// Deletes an item from the cache (not from permanent storage).
488- fn delete(&self, key: &Oid) -> StorageFuture<(), Self::Error> {
489+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error> {
490 // Only ever delete items from the cache. This may be called when
491 // a corrupted object is detected.
492- Box::new(self.cache.delete(key).map_err(Error::from_cache))
493+ let key = StorageKey::new(*key.oid());
494+ Box::new(self.cache.delete(&key).map_err(Error::from_cache))
495 }
496
497 /// Returns a stream of cached items.
498 fn list(&self) -> StorageStream<(Oid, u64), Self::Error> {
499- // TODO: Use the internal linked hash map instead to get this list.
500+ // TODO: Use the LRU instead to get this list.
501 Box::new(self.cache.list().map_err(Error::from_cache))
502 }
503
504 diff --git a/src/storage/disk.rs b/src/storage/disk.rs
505index 7a7d47f..f3c1dda 100644
506--- a/src/storage/disk.rs
507+++ b/src/storage/disk.rs
508 @@ -31,7 +31,7 @@ use tokio::{
509 };
510 use uuid::Uuid;
511
512- use super::{LFSObject, Storage, StorageFuture, StorageStream};
513+ use super::{LFSObject, Storage, StorageFuture, StorageKey, StorageStream};
514 use crate::lfs::Oid;
515
516 pub struct Backend {
517 @@ -46,15 +46,26 @@ impl Backend {
518
519 // Use sub directories in order to better utilize the file system's internal
520 // tree data structure.
521- fn key_to_path(&self, oid: &Oid) -> PathBuf {
522- self.root.join(format!("objects/{}", oid.path()))
523+ fn key_to_path(&self, key: &StorageKey) -> PathBuf {
524+ if let Some(namespace) = key.namespace() {
525+ self.root.join(format!(
526+ "objects/{}/{}",
527+ namespace,
528+ key.oid().path()
529+ ))
530+ } else {
531+ self.root.join(format!("objects/{}", key.oid().path()))
532+ }
533 }
534 }
535
536 impl Storage for Backend {
537 type Error = io::Error;
538
539- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error> {
540+ fn get(
541+ &self,
542+ key: &StorageKey,
543+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
544 Box::new(
545 fs::File::open(self.key_to_path(key))
546 .and_then(fs::File::metadata)
547 @@ -80,10 +91,10 @@ impl Storage for Backend {
548
549 fn put(
550 &self,
551- key: &Oid,
552+ key: StorageKey,
553 value: LFSObject,
554 ) -> StorageFuture<(), Self::Error> {
555- let path = self.key_to_path(key);
556+ let path = self.key_to_path(&key);
557 let dir = path.parent().unwrap().to_path_buf();
558
559 let incomplete = self.root.join("incomplete");
560 @@ -101,7 +112,10 @@ impl Storage for Backend {
561 )
562 }
563
564- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error> {
565+ fn size(
566+ &self,
567+ key: &StorageKey,
568+ ) -> StorageFuture<Option<u64>, Self::Error> {
569 let path = self.key_to_path(key);
570
571 Box::new(
572 @@ -114,7 +128,7 @@ impl Storage for Backend {
573 )
574 }
575
576- fn delete(&self, key: &Oid) -> StorageFuture<(), Self::Error> {
577+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error> {
578 Box::new(fs::remove_file(self.key_to_path(key)).or_else(move |err| {
579 match err.kind() {
580 io::ErrorKind::NotFound => Ok(()),
581 diff --git a/src/storage/encrypt.rs b/src/storage/encrypt.rs
582index 2e7394f..073dbb0 100644
583--- a/src/storage/encrypt.rs
584+++ b/src/storage/encrypt.rs
585 @@ -25,7 +25,7 @@ use futures::{Future, Stream};
586
587 use crate::lfs::Oid;
588
589- use super::{LFSObject, Storage, StorageFuture, StorageStream};
590+ use super::{LFSObject, Storage, StorageFuture, StorageKey, StorageStream};
591
592 /// A storage adaptor that encrypts/decrypts all data that passes through.
593 pub struct Backend<S> {
594 @@ -67,10 +67,13 @@ where
595 {
596 type Error = S::Error;
597
598- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error> {
599+ fn get(
600+ &self,
601+ key: &StorageKey,
602+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
603 // Use the first part of the SHA256 as the nonce.
604 let mut nonce: [u8; 24] = [0; 24];
605- nonce.copy_from_slice(&key.bytes()[0..24]);
606+ nonce.copy_from_slice(&key.oid().bytes()[0..24]);
607
608 let chacha = ChaCha::new_xchacha20(&self.key, &nonce);
609
610 @@ -88,12 +91,12 @@ where
611
612 fn put(
613 &self,
614- key: &Oid,
615+ key: StorageKey,
616 value: LFSObject,
617 ) -> StorageFuture<(), Self::Error> {
618 // Use the first part of the SHA256 as the nonce.
619 let mut nonce: [u8; 24] = [0; 24];
620- nonce.copy_from_slice(&key.bytes()[0..24]);
621+ nonce.copy_from_slice(&key.oid().bytes()[0..24]);
622
623 let chacha = ChaCha::new_xchacha20(&self.key, &nonce);
624
625 @@ -103,11 +106,14 @@ where
626 self.storage.put(key, LFSObject::new(len, Box::new(stream)))
627 }
628
629- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error> {
630+ fn size(
631+ &self,
632+ key: &StorageKey,
633+ ) -> StorageFuture<Option<u64>, Self::Error> {
634 self.storage.size(key)
635 }
636
637- fn delete(&self, key: &Oid) -> StorageFuture<(), Self::Error> {
638+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error> {
639 self.storage.delete(key)
640 }
641
642 diff --git a/src/storage/mod.rs b/src/storage/mod.rs
643index 2e5407a..772222f 100644
644--- a/src/storage/mod.rs
645+++ b/src/storage/mod.rs
646 @@ -48,6 +48,78 @@ pub type StorageStream<T, E> = Box<dyn Stream<Item = T, Error = E> + Send>;
647 /// The byte stream of an LFS object.
648 pub type ByteStream = Box<dyn Stream<Item = Bytes, Error = io::Error> + Send>;
649
650+ /// A namespace is used to categorize stored LFS objects. The storage
651+ /// implementation is free to ignore this. However, it can be useful when
652+ /// pruning old LFS objects from permanent storage.
653+ #[derive(Clone)]
654+ pub struct Namespace {
655+ org: String,
656+ project: String,
657+ }
658+
659+ impl Namespace {
660+ pub fn new(org: String, project: String) -> Self {
661+ Namespace { org, project }
662+ }
663+
664+ #[allow(unused)]
665+ pub fn split(self) -> (String, String) {
666+ (self.org, self.project)
667+ }
668+
669+ pub fn org(&self) -> &str {
670+ &self.org
671+ }
672+
673+ pub fn project(&self) -> &str {
674+ &self.project
675+ }
676+ }
677+
678+ impl fmt::Display for Namespace {
679+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
680+ write!(f, "{}/{}", self.org(), self.project())
681+ }
682+ }
683+
684+ /// A key into storage.
685+ #[derive(Clone)]
686+ pub struct StorageKey {
687+ /// The object ID.
688+ oid: Oid,
689+
690+ /// The namespace is optional. Storage backends that act as a cache may not
691+ /// want to use this.
692+ namespace: Option<Namespace>,
693+ }
694+
695+ impl StorageKey {
696+ pub fn new(oid: Oid) -> Self {
697+ StorageKey {
698+ oid,
699+ namespace: None,
700+ }
701+ }
702+
703+ pub fn with_namespace(mut self, namespace: Namespace) -> Self {
704+ self.namespace = Some(namespace);
705+ self
706+ }
707+
708+ #[allow(unused)]
709+ pub fn into_parts(self) -> (Oid, Option<Namespace>) {
710+ (self.oid, self.namespace)
711+ }
712+
713+ pub fn oid(&self) -> &Oid {
714+ &self.oid
715+ }
716+
717+ pub fn namespace(&self) -> &Option<Namespace> {
718+ &self.namespace
719+ }
720+ }
721+
722 /// An LFS object to be uploaded or downloaded.
723 pub struct LFSObject {
724 /// Size of the object.
725 @@ -117,24 +189,28 @@ pub trait Storage {
726 type Error: fmt::Display + Send;
727
728 /// Gets an entry from the storage medium.
729- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error>;
730+ fn get(
731+ &self,
732+ key: &StorageKey,
733+ ) -> StorageFuture<Option<LFSObject>, Self::Error>;
734
735 /// Sets an entry in the storage medium.
736 fn put(
737 &self,
738- key: &Oid,
739+ key: StorageKey,
740 value: LFSObject,
741 ) -> StorageFuture<(), Self::Error>;
742
743 /// Gets the size of the object. Returns `None` if the object does not
744 /// exist.
745- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error>;
746+ fn size(&self, key: &StorageKey)
747+ -> StorageFuture<Option<u64>, Self::Error>;
748
749 /// Deletes an object.
750 ///
751 /// Permanent storage backends may choose to never delete objects, always
752 /// returning success.
753- fn delete(&self, key: &Oid) -> StorageFuture<(), Self::Error>;
754+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error>;
755
756 /// Returns a stream of all the object IDs in the storage medium.
757 fn list(&self) -> StorageStream<(Oid, u64), Self::Error>;
758 @@ -159,23 +235,29 @@ where
759 {
760 type Error = S::Error;
761
762- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error> {
763+ fn get(
764+ &self,
765+ key: &StorageKey,
766+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
767 (**self).get(key)
768 }
769
770 fn put(
771 &self,
772- key: &Oid,
773+ key: StorageKey,
774 value: LFSObject,
775 ) -> StorageFuture<(), Self::Error> {
776 (**self).put(key, value)
777 }
778
779- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error> {
780+ fn size(
781+ &self,
782+ key: &StorageKey,
783+ ) -> StorageFuture<Option<u64>, Self::Error> {
784 (**self).size(key)
785 }
786
787- fn delete(&self, key: &Oid) -> StorageFuture<(), Self::Error> {
788+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error> {
789 (**self).delete(key)
790 }
791
792 diff --git a/src/storage/s3.rs b/src/storage/s3.rs
793index def688c..0860d3b 100644
794--- a/src/storage/s3.rs
795+++ b/src/storage/s3.rs
796 @@ -28,7 +28,7 @@ use rusoto_s3::{
797 S3Client, StreamingBody, S3,
798 };
799
800- use super::{LFSObject, Storage, StorageFuture, StorageStream};
801+ use super::{LFSObject, Storage, StorageFuture, StorageKey, StorageStream};
802
803 use crate::lfs::Oid;
804
805 @@ -91,8 +91,15 @@ pub struct Backend<C = S3Client> {
806 impl Backend {
807 pub fn new(
808 bucket: String,
809- prefix: String,
810+ mut prefix: String,
811 ) -> impl Future<Item = Self, Error = Error> {
812+ // Ensure the prefix doesn't end with a '/'.
813+ while prefix.ends_with('/') {
814+ prefix.pop();
815+ }
816+
817+ // `Region::default` will get try to get the region from the environment
818+ // and fallback to a default if it isn't found.
819 Backend::with_client(S3Client::new(Region::default()), bucket, prefix)
820 }
821 }
822 @@ -124,8 +131,12 @@ where
823 })
824 }
825
826- fn key_to_path(&self, oid: &Oid) -> String {
827- format!("{}/{}", self.prefix, oid.path())
828+ fn key_to_path(&self, key: &StorageKey) -> String {
829+ if let Some(namespace) = key.namespace() {
830+ format!("{}/{}/{}", self.prefix, namespace, key.oid().path())
831+ } else {
832+ format!("{}/{}", self.prefix, key.oid().path())
833+ }
834 }
835 }
836
837 @@ -135,7 +146,10 @@ where
838 {
839 type Error = Error;
840
841- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error> {
842+ fn get(
843+ &self,
844+ key: &StorageKey,
845+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
846 let request = GetObjectRequest {
847 bucket: self.bucket.clone(),
848 key: self.key_to_path(key),
849 @@ -165,12 +179,9 @@ where
850
851 fn put(
852 &self,
853- key: &Oid,
854+ key: StorageKey,
855 value: LFSObject,
856 ) -> StorageFuture<(), Self::Error> {
857- // TODO:
858- // * Put an upper limit on the amount of data that can be uploaded.
859- // * Encrypt the data with a private key(?)
860 let (len, stream) = value.into_parts();
861
862 let stream = stream.map(|chunk| {
863 @@ -182,7 +193,7 @@ where
864
865 let request = PutObjectRequest {
866 bucket: self.bucket.clone(),
867- key: self.key_to_path(key),
868+ key: self.key_to_path(&key),
869 content_length: Some(len as i64),
870 body: Some(StreamingBody::new(stream)),
871 ..Default::default()
872 @@ -191,7 +202,10 @@ where
873 Box::new(self.client.put_object(request).map(|_| ()).from_err())
874 }
875
876- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error> {
877+ fn size(
878+ &self,
879+ key: &StorageKey,
880+ ) -> StorageFuture<Option<u64>, Self::Error> {
881 let request = HeadObjectRequest {
882 bucket: self.bucket.clone(),
883 key: self.key_to_path(key),
884 @@ -228,8 +242,9 @@ where
885 )
886 }
887
888- /// This never deletes objects from S3 and always returns success.
889- fn delete(&self, _key: &Oid) -> StorageFuture<(), Self::Error> {
890+ /// This never deletes objects from S3 and always returns success. This may
891+ /// be changed in the future.
892+ fn delete(&self, _key: &StorageKey) -> StorageFuture<(), Self::Error> {
893 Box::new(future::ok(()))
894 }
895
896 diff --git a/src/storage/verify.rs b/src/storage/verify.rs
897index 32ac44f..0793b61 100644
898--- a/src/storage/verify.rs
899+++ b/src/storage/verify.rs
900 @@ -29,7 +29,7 @@ use futures::{
901 use crate::lfs::Oid;
902 use crate::sha256::{Sha256VerifyError, VerifyStream};
903
904- use super::{LFSObject, Storage, StorageFuture, StorageStream};
905+ use super::{LFSObject, Storage, StorageFuture, StorageKey, StorageStream};
906
907 #[derive(Debug, Display, From)]
908 enum Error {
909 @@ -73,8 +73,11 @@ where
910 /// from storage if it is corrupted. If storage backends are composed
911 /// correctly, then it should only delete cache storage (not permanent
912 /// storage).
913- fn get(&self, key: &Oid) -> StorageFuture<Option<LFSObject>, Self::Error> {
914- let key = *key;
915+ fn get(
916+ &self,
917+ key: &StorageKey,
918+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
919+ let key = key.clone();
920
921 let storage = self.storage.clone();
922
923 @@ -82,25 +85,28 @@ where
924 Some(obj) => {
925 let (len, stream) = obj.into_parts();
926
927- let stream =
928- VerifyStream::new(stream.map_err(Error::from), len, key)
929- .or_else(move |err| match err {
930- Error::Verify(err) => {
931- log::error!(
932- "Deleting corrupted object {} ({})",
933- key,
934- err
935- );
936-
937- Either::A(storage.delete(&key).then(|_| {
938- Err(io::Error::new(
939- io::ErrorKind::Other,
940- "found corrupted object",
941- ))
942- }))
943- }
944- Error::Io(err) => Either::B(future::err(err)),
945- });
946+ let stream = VerifyStream::new(
947+ stream.map_err(Error::from),
948+ len,
949+ *key.oid(),
950+ )
951+ .or_else(move |err| match err {
952+ Error::Verify(err) => {
953+ log::error!(
954+ "Deleting corrupted object {} ({})",
955+ key.oid(),
956+ err
957+ );
958+
959+ Either::A(storage.delete(&key).then(|_| {
960+ Err(io::Error::new(
961+ io::ErrorKind::Other,
962+ "found corrupted object",
963+ ))
964+ }))
965+ }
966+ Error::Io(err) => Either::B(future::err(err)),
967+ });
968
969 Some(LFSObject::new(len, Box::new(stream)))
970 }
971 @@ -110,25 +116,31 @@ where
972
973 fn put(
974 &self,
975- key: &Oid,
976+ key: StorageKey,
977 value: LFSObject,
978 ) -> StorageFuture<(), Self::Error> {
979 let (len, stream) = value.into_parts();
980
981- let stream = VerifyStream::new(stream.map_err(Error::from), len, *key)
982- .map_err(move |err| match err {
983- Error::Verify(err) => io::Error::new(io::ErrorKind::Other, err),
984- Error::Io(err) => io::Error::new(io::ErrorKind::Other, err),
985- });
986+ let stream =
987+ VerifyStream::new(stream.map_err(Error::from), len, *key.oid())
988+ .map_err(move |err| match err {
989+ Error::Verify(err) => {
990+ io::Error::new(io::ErrorKind::Other, err)
991+ }
992+ Error::Io(err) => io::Error::new(io::ErrorKind::Other, err),
993+ });
994
995 self.storage.put(key, LFSObject::new(len, Box::new(stream)))
996 }
997
998- fn size(&self, key: &Oid) -> StorageFuture<Option<u64>, Self::Error> {
999+ fn size(
1000+ &self,
1001+ key: &StorageKey,
1002+ ) -> StorageFuture<Option<u64>, Self::Error> {
1003 self.storage.size(key)
1004 }
1005
1006- fn delete(&self, key: &Oid) -> StorageFuture<(), Self::Error> {
1007+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error> {
1008 self.storage.delete(key)
1009 }
1010