Author: Jason White [jwhite@esri.com]
Hash: 67a6d6e9138936333bfac463f9dfc5eaeac9ccee
Timestamp: Thu, 02 May 2019 23:35:29 +0000 (5 years ago)

+481 -57 +/-8 browse
Fix cache ending up with incomplete objects
Fix cache ending up with incomplete objects

Also adds fault injection to help track down rare bugs related to errors
in backend services.

Fixes #5.
1diff --git a/Cargo.lock b/Cargo.lock
2index 2203d3c..1b89dff 100644
3--- a/Cargo.lock
4+++ b/Cargo.lock
5 @@ -1047,6 +1047,7 @@ dependencies = [
6 "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
7 "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
8 "pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
9+ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
10 "rusoto_core 0.37.0 (registry+https://github.com/rust-lang/crates.io-index)",
11 "rusoto_s3 0.37.0 (registry+https://github.com/rust-lang/crates.io-index)",
12 "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
13 diff --git a/Cargo.toml b/Cargo.toml
14index 90881b0..3e6379c 100644
15--- a/Cargo.toml
16+++ b/Cargo.toml
17 @@ -37,6 +37,7 @@ structopt = "0.2"
18 tokio = "0.1"
19 url = "1"
20 uuid = { version = "0.7", features = ["v4"] }
21+ rand = { version = "0.6", optional = true }
22
23 [dependencies.rusoto_core]
24 version = "0.37"
25 @@ -47,3 +48,9 @@ features = ["rustls"]
26 version = "0.37"
27 default_features = false
28 features = ["rustls"]
29+
30+ [features]
31+ default = []
32+ # If the "faulty" feature is enabled, random failures are injected into the byte
33+ # stream.
34+ faulty = ["rand"]
35 diff --git a/src/main.rs b/src/main.rs
36index 009dfc2..d8061ad 100644
37--- a/src/main.rs
38+++ b/src/main.rs
39 @@ -25,6 +25,7 @@ mod logger;
40 mod lru;
41 mod sha256;
42 mod storage;
43+ mod util;
44
45 use std::net::{SocketAddr, ToSocketAddrs};
46 use std::path::PathBuf;
47 @@ -43,6 +44,9 @@ use crate::error::Error;
48 use crate::logger::Logger;
49 use crate::storage::{Cached, Disk, Encrypted, Retrying, Verify, S3};
50
51+ #[cfg(feature = "faulty")]
52+ use crate::storage::Faulty;
53+
54 #[derive(StructOpt)]
55 struct Args {
56 /// Host or address to listen on.
57 @@ -102,8 +106,17 @@ impl Args {
58 let storage = disk
59 .join(s3)
60 .and_then(move |(disk, s3)| {
61+ // Retry certain operations to S3 to make it more reliable.
62+ let s3 = Retrying::new(s3);
63+
64+ // Add a little instability for testing purposes.
65+ #[cfg(feature = "faulty")]
66+ let s3 = Faulty::new(s3);
67+ #[cfg(feature = "faulty")]
68+ let disk = Faulty::new(disk);
69+
70 // Use the disk as a cache.
71- Cached::new(max_cache_size, disk, Retrying::new(s3)).from_err()
72+ Cached::new(max_cache_size, disk, s3).from_err()
73 })
74 .map(move |storage| {
75 // Verify object SHA256s as they are uploaded and downloaded.
76 diff --git a/src/storage/cached.rs b/src/storage/cached.rs
77index 1b5620b..fec2572 100644
78--- a/src/storage/cached.rs
79+++ b/src/storage/cached.rs
80 @@ -21,9 +21,12 @@ use std::fmt;
81 use std::io;
82 use std::sync::{Arc, Mutex};
83
84+ use bytes::Bytes;
85 use futures::{
86 future::{self, Either},
87- stream, Future, Stream,
88+ stream,
89+ sync::oneshot,
90+ Future, Stream,
91 };
92 use tokio;
93
94 @@ -250,24 +253,33 @@ where
95 .and_then(move |obj| match obj {
96 Some(obj) => {
97 // Cache the returned LFS object.
98- let (a, b) = obj.split();
99+ let (f, a, b) = obj.fanout();
100
101 // Cache the object in the background. Whether or not
102 // this succeeds shouldn't prevent the client from
103 // getting the LFS object. For example, even if we run
104 // out of disk space, the server should still continue
105 // operating.
106+ let cache = cache_and_prune(
107+ cache,
108+ key.clone(),
109+ b,
110+ lru,
111+ max_size,
112+ )
113+ .map_err(Error::from_cache);
114+
115 tokio::spawn(
116- cache_and_prune(
117- cache,
118- key.clone(),
119- b,
120- lru,
121- max_size,
122- )
123- .map_err(move |err| {
124- log::error!("Error caching {} ({})", key, err);
125- }),
126+ f.map_err(Error::from_stream)
127+ .join(cache)
128+ .map(|((), ())| ())
129+ .map_err(move |err: Self::Error| {
130+ log::error!(
131+ "Error caching {} ({})",
132+ key,
133+ err
134+ );
135+ }),
136 );
137
138 // Send the object from permanent-storage.
139 @@ -296,9 +308,44 @@ where
140
141 let (f, a, b) = value.fanout();
142
143- let cache = cache_and_prune(cache, key.clone(), b, lru, max_size)
144- .map_err(Error::from_cache);
145- let store = self.storage.put(key, a).map_err(Error::from_storage);
146+ // Note: We can only cache an object if it is successfully uploaded to
147+ // the store. Thus, we do something clever with this one shot channel.
148+ //
149+ // When the permanent storage finishes receiving its LFS object, we send
150+ // a signal to be received by an empty chunk at the end of the stream
151+ // going to the cache. Then, the cache only receives its last (empty)
152+ // chunk when the LFS object has been successfully stored.
153+ let (signal_sender, signal_receiver) = oneshot::channel();
154+
155+ let store = self
156+ .storage
157+ .put(key.clone(), a)
158+ .map(move |()| {
159+ // Send a signal to the cache so that it can complete its write.
160+ signal_sender.send(()).unwrap_or(())
161+ })
162+ .map_err(Error::from_storage);
163+
164+ let (len, stream) = b.into_parts();
165+
166+ // Add an empty chunk to the end of the stream whose only job is to
167+ // complete when it receives a signal that the upload completed to
168+ // permanent storage.
169+ let stream = stream.chain(
170+ signal_receiver
171+ .map(|()| Bytes::new())
172+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
173+ .into_stream(),
174+ );
175+
176+ let cache = cache_and_prune(
177+ cache,
178+ key,
179+ LFSObject::new(len, Box::new(stream)),
180+ lru,
181+ max_size,
182+ )
183+ .map_err(Error::from_cache);
184
185 Box::new(
186 f.map_err(Error::from_stream)
187 diff --git a/src/storage/disk.rs b/src/storage/disk.rs
188index 7e3d9c3..9b24387 100644
189--- a/src/storage/disk.rs
190+++ b/src/storage/disk.rs
191 @@ -22,11 +22,14 @@ use std::io;
192 use std::path::PathBuf;
193 use std::str::FromStr;
194
195- use bytes::BytesMut;
196- use futures::{future, Future, Stream};
197+ use bytes::{BufMut, Bytes, BytesMut};
198+ use futures::{
199+ future::{self, Either},
200+ Future, Stream,
201+ };
202 use tokio::{
203 self,
204- codec::{BytesCodec, Framed},
205+ codec::{Decoder, Encoder, Framed},
206 fs,
207 };
208 use uuid::Uuid;
209 @@ -35,6 +38,7 @@ use super::{
210 LFSObject, Namespace, Storage, StorageFuture, StorageKey, StorageStream,
211 };
212 use crate::lfs::Oid;
213+ use crate::util::NamedTempFile;
214
215 pub struct Backend {
216 root: PathBuf,
217 @@ -95,18 +99,43 @@ impl Storage for Backend {
218 let path = self.key_to_path(&key);
219 let dir = path.parent().unwrap().to_path_buf();
220
221+ let (len, stream) = value.into_parts();
222+
223 let incomplete = self.root.join("incomplete");
224 let temp_path = incomplete.join(Uuid::new_v4().to_string());
225- let temp_path2 = temp_path.clone();
226
227 Box::new(
228 fs::create_dir_all(incomplete)
229- .and_then(move |()| fs::File::create(temp_path))
230+ .and_then(move |()| {
231+ // Note that when this is dropped, the file is deleted.
232+ // Thus, if anything goes wrong we are not left with
233+ // a temporary file laying around.
234+ NamedTempFile::new(temp_path)
235+ })
236 .and_then(move |file| {
237- value.stream().forward(Framed::new(file, BytesCodec::new()))
238+ stream.forward(Framed::new(file, BytesCodec::new()))
239 })
240- .and_then(move |_| fs::create_dir_all(dir))
241- .and_then(move |()| fs::rename(temp_path2, path)),
242+ .and_then(move |(_, sink)| {
243+ let written = sink.codec().written();
244+ let file = sink.into_inner();
245+
246+ if written != len {
247+ // If we didn't get a full object, we cannot save it to
248+ // disk. This can happen if we're using the disk as
249+ // a cache and there is an error in the middle of the
250+ // upload.
251+ Either::A(future::err(io::Error::new(
252+ io::ErrorKind::Other,
253+ "got incomplete object",
254+ )))
255+ } else {
256+ Either::B(
257+ fs::create_dir_all(dir)
258+ .and_then(move |()| file.persist(path))
259+ .map(|_| ()),
260+ )
261+ }
262+ }),
263 )
264 }
265
266 @@ -142,7 +171,7 @@ impl Storage for Backend {
267 ///
268 /// The directory structure is assumed to be like this:
269 ///
270- /// objects
271+ /// objects/{org}/{project}/
272 /// ├── 00
273 /// │ ├── 07
274 /// │ │ └── 0007941906960...
275 @@ -200,3 +229,52 @@ impl Storage for Backend {
276 )
277 }
278 }
279+
280+ /// A simple bytes codec that keeps track of its length.
281+ struct BytesCodec {
282+ written: u64,
283+ }
284+
285+ impl BytesCodec {
286+ pub fn new() -> Self {
287+ BytesCodec { written: 0 }
288+ }
289+
290+ pub fn written(&self) -> u64 {
291+ self.written
292+ }
293+ }
294+
295+ impl Decoder for BytesCodec {
296+ type Item = BytesMut;
297+ type Error = io::Error;
298+
299+ fn decode(
300+ &mut self,
301+ buf: &mut BytesMut,
302+ ) -> Result<Option<Self::Item>, Self::Error> {
303+ if !buf.is_empty() {
304+ let len = buf.len();
305+ Ok(Some(buf.split_to(len)))
306+ } else {
307+ Ok(None)
308+ }
309+ }
310+ }
311+
312+ impl Encoder for BytesCodec {
313+ type Item = Bytes;
314+ type Error = io::Error;
315+
316+ fn encode(
317+ &mut self,
318+ data: Bytes,
319+ buf: &mut BytesMut,
320+ ) -> Result<(), io::Error> {
321+ let len = data.len();
322+ self.written += len as u64;
323+ buf.reserve(len);
324+ buf.put(data);
325+ Ok(())
326+ }
327+ }
328 diff --git a/src/storage/faulty.rs b/src/storage/faulty.rs
329new file mode 100644
330index 0000000..8f3e5b3
331--- /dev/null
332+++ b/src/storage/faulty.rs
333 @@ -0,0 +1,157 @@
334+ // Copyright (c) 2019 Jason White
335+ //
336+ // Permission is hereby granted, free of charge, to any person obtaining a copy
337+ // of this software and associated documentation files (the "Software"), to deal
338+ // in the Software without restriction, including without limitation the rights
339+ // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
340+ // copies of the Software, and to permit persons to whom the Software is
341+ // furnished to do so, subject to the following conditions:
342+ //
343+ // The above copyright notice and this permission notice shall be included in
344+ // all copies or substantial portions of the Software.
345+ //
346+ // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
347+ // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
348+ // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
349+ // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
350+ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
351+ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
352+ // SOFTWARE.
353+ use std::io;
354+
355+ use derive_more::{Display, From};
356+ use futures::{try_ready, Async, Future, Poll, Stream};
357+ use rand::{self, Rng};
358+
359+ use super::{LFSObject, Storage, StorageFuture, StorageKey, StorageStream};
360+
361+ #[derive(Debug, Display, From)]
362+ enum Error {
363+ Fault(FaultError),
364+ Io(io::Error),
365+ }
366+
367+ impl std::error::Error for Error {}
368+
369+ /// The "Faulty McFaultFace" storage backend. This is used for failure injection
370+ /// testing.
371+ ///
372+ /// This is a storage backend adaptor that will have its uploads or downloads
373+ /// randomly fail. The system should be robust enough to handle these failures
374+ /// gracefully.
375+ pub struct Backend<S> {
376+ storage: S,
377+ }
378+
379+ impl<S> Backend<S> {
380+ pub fn new(storage: S) -> Self {
381+ Backend { storage }
382+ }
383+ }
384+
385+ impl<S> Storage for Backend<S>
386+ where
387+ S: Storage + Send + Sync + 'static,
388+ S::Error: 'static,
389+ {
390+ type Error = S::Error;
391+
392+ fn get(
393+ &self,
394+ key: &StorageKey,
395+ ) -> StorageFuture<Option<LFSObject>, Self::Error> {
396+ Box::new(self.storage.get(key).map(move |obj| -> Option<_> {
397+ let (len, stream) = obj?.into_parts();
398+
399+ Some(LFSObject::new(len, Box::new(FaultyStream::new(stream))))
400+ }))
401+ }
402+
403+ fn put(
404+ &self,
405+ key: StorageKey,
406+ value: LFSObject,
407+ ) -> StorageFuture<(), Self::Error> {
408+ let (len, stream) = value.into_parts();
409+
410+ let stream = FaultyStream::new(stream);
411+
412+ self.storage.put(key, LFSObject::new(len, Box::new(stream)))
413+ }
414+
415+ fn size(
416+ &self,
417+ key: &StorageKey,
418+ ) -> StorageFuture<Option<u64>, Self::Error> {
419+ self.storage.size(key)
420+ }
421+
422+ fn delete(&self, key: &StorageKey) -> StorageFuture<(), Self::Error> {
423+ self.storage.delete(key)
424+ }
425+
426+ fn list(&self) -> StorageStream<(StorageKey, u64), Self::Error> {
427+ self.storage.list()
428+ }
429+
430+ fn total_size(&self) -> Option<u64> {
431+ self.storage.total_size()
432+ }
433+
434+ fn max_size(&self) -> Option<u64> {
435+ self.storage.max_size()
436+ }
437+ }
438+
439+ #[derive(Debug, Display)]
440+ #[display(fmt = "injected fault")]
441+ pub struct FaultError;
442+
443+ impl std::error::Error for FaultError {}
444+
445+ impl From<FaultError> for io::Error {
446+ fn from(error: FaultError) -> io::Error {
447+ io::Error::new(io::ErrorKind::Other, error.to_string())
448+ }
449+ }
450+
451+ /// A stream that has random failures.
452+ ///
453+ /// One out of 256 items of the stream will fail.
454+ pub struct FaultyStream<S> {
455+ /// The underlying stream.
456+ stream: S,
457+ }
458+
459+ impl<S> FaultyStream<S> {
460+ pub fn new(stream: S) -> Self {
461+ FaultyStream { stream }
462+ }
463+ }
464+
465+ impl<S> Stream for FaultyStream<S>
466+ where
467+ S: Stream,
468+ S::Error: From<FaultError>,
469+ {
470+ type Item = S::Item;
471+ type Error = S::Error;
472+
473+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
474+ let item = try_ready!(self.stream.poll());
475+
476+ match item {
477+ Some(item) => {
478+ if rand::thread_rng().gen::<u8>() == 0 {
479+ Err(FaultError.into())
480+ } else {
481+ Ok(Async::Ready(Some(item)))
482+ }
483+ }
484+ None => {
485+ // End of stream.
486+ Ok(Async::Ready(None))
487+ }
488+ }
489+ }
490+ }
491 diff --git a/src/storage/mod.rs b/src/storage/mod.rs
492index 028b67a..95c19a9 100644
493--- a/src/storage/mod.rs
494+++ b/src/storage/mod.rs
495 @@ -20,6 +20,8 @@
496 mod cached;
497 mod disk;
498 mod encrypt;
499+ #[cfg(feature = "faulty")]
500+ mod faulty;
501 mod retrying;
502 mod s3;
503 mod verify;
504 @@ -27,6 +29,8 @@ mod verify;
505 pub use cached::{Backend as Cached, Error as CacheError};
506 pub use disk::Backend as Disk;
507 pub use encrypt::Backend as Encrypted;
508+ #[cfg(feature = "faulty")]
509+ pub use faulty::Backend as Faulty;
510 pub use retrying::Backend as Retrying;
511 pub use s3::{Backend as S3, Error as S3Error};
512 pub use verify::Backend as Verify;
513 @@ -147,38 +151,6 @@ impl LFSObject {
514 ///
515 /// This is useful for caching LFS objects while simultaneously sending them
516 /// to a client.
517- pub fn split(self) -> (Self, Self) {
518- let (len, stream) = self.into_parts();
519-
520- let (sender, receiver) = mpsc::channel(0);
521-
522- let stream = stream.and_then(move |chunk| {
523- // TODO: Find a way to not clone the sender.
524- sender
525- .clone()
526- .send(chunk.clone())
527- .and_then(move |_| Ok(chunk))
528- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
529- });
530-
531- let a = LFSObject {
532- len,
533- stream: Box::new(stream),
534- };
535-
536- let b = LFSObject {
537- len,
538- stream: Box::new(receiver.map_err(|()| {
539- io::Error::new(
540- io::ErrorKind::Other,
541- "failed during body duplication",
542- )
543- })),
544- };
545-
546- (a, b)
547- }
548-
549 pub fn fanout(
550 self,
551 ) -> (impl Future<Item = (), Error = io::Error>, Self, Self) {
552 diff --git a/src/util.rs b/src/util.rs
553new file mode 100644
554index 0000000..04a45fe
555--- /dev/null
556+++ b/src/util.rs
557 @@ -0,0 +1,149 @@
558+ // Copyright (c) 2019 Jason White
559+ //
560+ // Permission is hereby granted, free of charge, to any person obtaining a copy
561+ // of this software and associated documentation files (the "Software"), to deal
562+ // in the Software without restriction, including without limitation the rights
563+ // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
564+ // copies of the Software, and to permit persons to whom the Software is
565+ // furnished to do so, subject to the following conditions:
566+ //
567+ // The above copyright notice and this permission notice shall be included in
568+ // all copies or substantial portions of the Software.
569+ //
570+ // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
571+ // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
572+ // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
573+ // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
574+ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
575+ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
576+ // SOFTWARE.
577+
578+ use std::io;
579+ use std::mem;
580+ use std::ops::Deref;
581+ use std::path::{Path, PathBuf};
582+
583+ use futures::{Future, Poll};
584+ use tokio::{
585+ fs,
586+ io::{AsyncRead, AsyncWrite},
587+ };
588+
589+ /// A temporary file path. When dropped, the file is deleted.
590+ #[derive(Debug)]
591+ pub struct TempPath(PathBuf);
592+
593+ impl TempPath {
594+ /// Renames the file without deleting it.
595+ pub fn persist<P: AsRef<Path>>(
596+ mut self,
597+ new_path: P,
598+ ) -> impl Future<Item = (), Error = io::Error> {
599+ // Don't drop self. We want to avoid deleting the file here and also
600+ // avoid leaking memory.
601+ let path = mem::replace(&mut self.0, PathBuf::new());
602+ mem::forget(self);
603+
604+ fs::rename(path, new_path)
605+ }
606+ }
607+
608+ impl Deref for TempPath {
609+ type Target = Path;
610+
611+ fn deref(&self) -> &Path {
612+ &self.0
613+ }
614+ }
615+
616+ impl AsRef<Path> for TempPath {
617+ fn as_ref(&self) -> &Path {
618+ &self.0
619+ }
620+ }
621+
622+ impl Drop for TempPath {
623+ fn drop(&mut self) {
624+ // Note that this is a synchronous call. We can't really return a future
625+ // to do this.
626+ let _ = std::fs::remove_file(&self.0);
627+ }
628+ }
629+
630+ /// A temporary async file.
631+ pub struct NamedTempFile {
632+ path: TempPath,
633+ file: fs::File,
634+ }
635+
636+ impl NamedTempFile {
637+ pub fn new<P>(temp_path: P) -> impl Future<Item = Self, Error = io::Error>
638+ where
639+ P: AsRef<Path> + Send + 'static,
640+ {
641+ let path = TempPath(temp_path.as_ref().to_owned());
642+ fs::File::create(temp_path)
643+ .map(move |file| NamedTempFile { path, file })
644+ }
645+
646+ pub fn into_parts(self) -> (fs::File, TempPath) {
647+ (self.file, self.path)
648+ }
649+
650+ pub fn persist<P: AsRef<Path>>(
651+ self,
652+ new_path: P,
653+ ) -> impl Future<Item = fs::File, Error = io::Error> {
654+ let (file, path) = self.into_parts();
655+ path.persist(new_path).map(move |()| file)
656+ }
657+ }
658+
659+ impl AsRef<Path> for NamedTempFile {
660+ fn as_ref(&self) -> &Path {
661+ &self.path
662+ }
663+ }
664+
665+ impl AsRef<fs::File> for NamedTempFile {
666+ fn as_ref(&self) -> &fs::File {
667+ &self.file
668+ }
669+ }
670+
671+ impl AsMut<fs::File> for NamedTempFile {
672+ fn as_mut(&mut self) -> &mut fs::File {
673+ &mut self.file
674+ }
675+ }
676+
677+ impl io::Read for NamedTempFile {
678+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
679+ self.file.read(buf)
680+ }
681+ }
682+
683+ impl io::Write for NamedTempFile {
684+ #[inline]
685+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
686+ self.file.write(buf)
687+ }
688+
689+ #[inline]
690+ fn flush(&mut self) -> io::Result<()> {
691+ self.file.flush()
692+ }
693+ }
694+
695+ impl AsyncRead for NamedTempFile {
696+ #[inline]
697+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
698+ self.file.prepare_uninitialized_buffer(buf)
699+ }
700+ }
701+
702+ impl AsyncWrite for NamedTempFile {
703+ fn shutdown(&mut self) -> Poll<(), io::Error> {
704+ self.file.shutdown()
705+ }
706+ }