Author: Daniel Noland [daniel@stateless.net]
Committer: Jason White [github@jasonwhite.io] Sun, 25 Sep 2022 21:45:24 +0000
Hash: a09123c3e90cdb3984ac056fbd2f55215fc71300
Timestamp: Sun, 25 Sep 2022 21:45:24 +0000 (2 years ago)

+48 -57 +/-3 browse
Fixup logic for "faulty"
Fixup logic for "faulty"

The faulty test is interesting but it needs refactor before it will work with current tokio.

This patch attempts that refactor.

I didn't notice that the "faulty" test was failing to build before due to the feature flag being off.
1diff --git a/src/lfs.rs b/src/lfs.rs
2index 87d0332..564b213 100644
3--- a/src/lfs.rs
4+++ b/src/lfs.rs
5 @@ -116,7 +116,7 @@ pub struct ResponseObject {
6 pub error: Option<ObjectError>,
7
8 /// Optional boolean specifying whether the request for this specific
9- /// object is authenticated. If ommitted or `false`, Git LFS will
10+ /// object is authenticated. If omitted or `false`, Git LFS will
11 /// attempt to find credentials for this URL.
12 #[serde(skip_serializing_if = "Option::is_none")]
13 pub authenticated: Option<bool>,
14 @@ -145,7 +145,7 @@ pub struct BatchRequest {
15 ///
16 /// Note: Git LFS currently only supports the `basic` transfer adapter.
17 /// This property was added for future compatibility with some experimental
18- /// tranfer adapters.
19+ /// transfer adapters.
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub transfers: Option<Vec<Transfer>>,
22
23 @@ -164,7 +164,7 @@ pub struct BatchRequest {
24 pub struct BatchResponse {
25 /// String identifier of the transfer adapter that the server prefers. This
26 /// *must* be one of the given `transfer` identifiers from the request.
27- /// Servers can assume the `basic` transfer adaptor if `None` was given.
28+ /// Servers can assume the `basic` transfer adapter if `None` was given.
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub transfer: Option<Transfer>,
31
32 diff --git a/src/main.rs b/src/main.rs
33index f511920..0e19f1e 100644
34--- a/src/main.rs
35+++ b/src/main.rs
36 @@ -25,8 +25,11 @@ use structopt::StructOpt;
37
38 use rudolfs::{Cache, LocalServerBuilder, S3ServerBuilder};
39
40- #[cfg(feature = "faulty")]
41- use crate::storage::Faulty;
42+ mod lfs;
43+ mod lru;
44+ mod sha256;
45+ mod storage;
46+ mod util;
47
48 // Additional help to append to the end when `--help` is specified.
49 static AFTER_HELP: &str = include_str!("help.md");
50 diff --git a/src/storage/faulty.rs b/src/storage/faulty.rs
51index cc56e39..910ed7d 100644
52--- a/src/storage/faulty.rs
53+++ b/src/storage/faulty.rs
54 @@ -18,10 +18,12 @@
55 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
56 // SOFTWARE.
57 use std::io;
58+ use std::time::Duration;
59
60+ use crate::storage::ByteStream;
61 use async_trait::async_trait;
62 use derive_more::{Display, From};
63- use futures::{try_ready, Async, Future, Poll, Stream};
64+ use futures::StreamExt;
65 use rand::{self, Rng};
66
67 use super::{LFSObject, Storage, StorageKey, StorageStream};
68 @@ -50,6 +52,16 @@ impl<S> Backend<S> {
69 }
70 }
71
72+ fn faulty_stream(stream: ByteStream) -> ByteStream {
73+ Box::pin(stream.map(|item| {
74+ if rand::thread_rng().gen::<u8>() == 0 {
75+ Err(io::Error::new(io::ErrorKind::Other, "injected fault"))
76+ } else {
77+ item
78+ }
79+ }))
80+ }
81+
82 #[async_trait]
83 impl<S> Storage for Backend<S>
84 where
85 @@ -62,11 +74,16 @@ where
86 &self,
87 key: &StorageKey,
88 ) -> Result<Option<LFSObject>, Self::Error> {
89- Box::pin(self.storage.get(key).map(move |obj| -> Option<_> {
90- let (len, stream) = obj?.into_parts();
91-
92- Some(LFSObject::new(len, Box::pin(FaultyStream::new(stream))))
93- }))
94+ let obj = self.storage.get(key).await;
95+ match obj {
96+ Ok(Some(lfs_obj)) => {
97+ let (len, s) = lfs_obj.into_parts();
98+ let fs = faulty_stream(s);
99+ Ok(Some(LFSObject::new(len, Box::pin(fs))))
100+ }
101+ Ok(None) => Ok(None),
102+ Err(err) => Err(err),
103+ }
104 }
105
106 async fn put(
107 @@ -76,17 +93,17 @@ where
108 ) -> Result<(), Self::Error> {
109 let (len, stream) = value.into_parts();
110
111- let stream = FaultyStream::new(stream);
112-
113- self.storage.put(key, LFSObject::new(len, Box::pin(stream)))
114+ self.storage
115+ .put(key, LFSObject::new(len, faulty_stream(stream)))
116+ .await
117 }
118
119 async fn size(&self, key: &StorageKey) -> Result<Option<u64>, Self::Error> {
120- self.storage.size(key)
121+ self.storage.size(key).await
122 }
123
124 async fn delete(&self, key: &StorageKey) -> Result<(), Self::Error> {
125- self.storage.delete(key)
126+ self.storage.delete(key).await
127 }
128
129 fn list(&self) -> StorageStream<(StorageKey, u64), Self::Error> {
130 @@ -100,6 +117,18 @@ where
131 async fn max_size(&self) -> Option<u64> {
132 self.storage.max_size().await
133 }
134+
135+ fn public_url(&self, key: &StorageKey) -> Option<String> {
136+ self.storage.public_url(key)
137+ }
138+
139+ async fn upload_url(
140+ &self,
141+ key: &StorageKey,
142+ expires_in: Duration,
143+ ) -> Option<String> {
144+ self.storage.upload_url(key, expires_in).await
145+ }
146 }
147
148 #[derive(Debug, Display)]
149 @@ -113,44 +142,3 @@ impl From<FaultError> for io::Error {
150 io::Error::new(io::ErrorKind::Other, error.to_string())
151 }
152 }
153-
154- /// A stream that has random failures.
155- ///
156- /// One out of 256 items of the stream will fail.
157- pub struct FaultyStream<S> {
158- /// The underlying stream.
159- stream: S,
160- }
161-
162- impl<S> FaultyStream<S> {
163- pub fn new(stream: S) -> Self {
164- FaultyStream { stream }
165- }
166- }
167-
168- impl<S> Stream for FaultyStream<S>
169- where
170- S: Stream,
171- S::Error: From<FaultError>,
172- {
173- type Item = S::Item;
174- type Error = S::Error;
175-
176- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
177- let item = try_ready!(self.stream.poll());
178-
179- match item {
180- Some(item) => {
181- if rand::thread_rng().gen::<u8>() == 0 {
182- Err(FaultError.into())
183- } else {
184- Ok(Async::Ready(Some(item)))
185- }
186- }
187- None => {
188- // End of stream.
189- Ok(Async::Ready(None))
190- }
191- }
192- }
193- }