Commit
Author: Jason White [github@jasonwhite.io]
Hash: f9cb5eeffa1a8b4402af2da2dbcf69538054d578
Timestamp: Fri, 25 Jun 2021 01:57:37 +0000 (3 years ago)

+303 -96 +/-3 browse
Allow Rudolfs to be used as a library
Allow Rudolfs to be used as a library

This makes integration testing easier.
1diff --git a/src/lib.rs b/src/lib.rs
2new file mode 100644
3index 0000000..df3ea7f
4--- /dev/null
5+++ b/src/lib.rs
6 @@ -0,0 +1,265 @@
7+ // Copyright (c) 2021 Jason White
8+ //
9+ // Permission is hereby granted, free of charge, to any person obtaining a copy
10+ // of this software and associated documentation files (the "Software"), to deal
11+ // in the Software without restriction, including without limitation the rights
12+ // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13+ // copies of the Software, and to permit persons to whom the Software is
14+ // furnished to do so, subject to the following conditions:
15+ //
16+ // The above copyright notice and this permission notice shall be included in
17+ // all copies or substantial portions of the Software.
18+ //
19+ // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20+ // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21+ // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22+ // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23+ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24+ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25+ // SOFTWARE.
26+ mod app;
27+ mod error;
28+ mod hyperext;
29+ mod lfs;
30+ mod logger;
31+ mod lru;
32+ mod sha256;
33+ mod storage;
34+ mod util;
35+
36+ use std::convert::Infallible;
37+ use std::net::SocketAddr;
38+ use std::path::PathBuf;
39+ use std::sync::Arc;
40+
41+ use futures::future::{self, Future, TryFutureExt};
42+ use hyper::{
43+ self,
44+ server::conn::{AddrIncoming, AddrStream},
45+ service::make_service_fn,
46+ };
47+
48+ use crate::app::App;
49+ use crate::error::Error;
50+ use crate::logger::Logger;
51+ use crate::storage::{Cached, Disk, Encrypted, Retrying, Storage, Verify, S3};
52+
53+ #[cfg(feature = "faulty")]
54+ use crate::storage::Faulty;
55+
56+ /// Represents a running LFS server.
57+ pub trait Server: Future<Output = hyper::Result<()>> {
58+ /// Returns the local address this server is bound to.
59+ fn addr(&self) -> SocketAddr;
60+ }
61+
62+ impl<S, E> Server for hyper::Server<AddrIncoming, S, E>
63+ where
64+ hyper::Server<AddrIncoming, S, E>: Future<Output = hyper::Result<()>>,
65+ {
66+ fn addr(&self) -> SocketAddr {
67+ self.local_addr()
68+ }
69+ }
70+
71+ #[derive(Debug)]
72+ pub struct Cache {
73+ /// Path to the cache.
74+ dir: PathBuf,
75+
76+ /// Maximum size of the cache, in bytes.
77+ max_size: u64,
78+ }
79+
80+ impl Cache {
81+ pub fn new(dir: PathBuf, max_size: u64) -> Self {
82+ Self { dir, max_size }
83+ }
84+ }
85+
86+ #[derive(Debug)]
87+ pub struct S3ServerBuilder {
88+ bucket: String,
89+ key: [u8; 32],
90+ prefix: Option<String>,
91+ cdn: Option<String>,
92+ cache: Option<Cache>,
93+ }
94+
95+ impl S3ServerBuilder {
96+ pub fn new(bucket: String, key: [u8; 32]) -> Self {
97+ Self {
98+ bucket,
99+ prefix: None,
100+ cdn: None,
101+ key,
102+ cache: None,
103+ }
104+ }
105+
106+ /// Sets the bucket to use.
107+ pub fn bucket(&mut self, bucket: String) -> &mut Self {
108+ self.bucket = bucket;
109+ self
110+ }
111+
112+ /// Sets the encryption key to use.
113+ pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
114+ self.key = key;
115+ self
116+ }
117+
118+ /// Sets the prefix to use.
119+ pub fn prefix(&mut self, prefix: String) -> &mut Self {
120+ self.prefix = Some(prefix);
121+ self
122+ }
123+
124+ /// Sets the base URL of the CDN to use. This is incompatible with
125+ /// encryption since the LFS object is not sent to Rudolfs.
126+ pub fn cdn(&mut self, url: String) -> &mut Self {
127+ self.cdn = Some(url);
128+ self
129+ }
130+
131+ /// Sets the cache to use. If not specified, then no local disk cache is
132+ /// used. All objects will get sent directly to S3.
133+ pub fn cache(&mut self, cache: Cache) -> &mut Self {
134+ self.cache = Some(cache);
135+ self
136+ }
137+
138+ /// Spawns the server. The server must be awaited on in order to accept
139+ /// incoming client connections and run.
140+ pub async fn spawn(
141+ self,
142+ addr: SocketAddr,
143+ ) -> Result<Box<dyn Server + Unpin>, Box<dyn std::error::Error>> {
144+ let prefix = self.prefix.unwrap_or_else(|| String::from("lfs"));
145+
146+ let s3 = S3::new(self.bucket, prefix, self.cdn)
147+ .map_err(Error::from)
148+ .await?;
149+
150+ // Retry certain operations to S3 to make it more reliable.
151+ let s3 = Retrying::new(s3);
152+
153+ // Add a little instability for testing purposes.
154+ #[cfg(feature = "faulty")]
155+ let s3 = Faulty::new(s3);
156+
157+ match self.cache {
158+ Some(cache) => {
159+ // Use disk storage as a cache.
160+ let disk = Disk::new(cache.dir).map_err(Error::from).await?;
161+
162+ #[cfg(feature = "faulty")]
163+ let disk = Faulty::new(disk);
164+
165+ let cache = Cached::new(cache.max_size, disk, s3).await?;
166+ let storage = Verify::new(Encrypted::new(self.key, cache));
167+ Ok(Box::new(spawn_server(storage, &addr)))
168+ }
169+ None => {
170+ let storage = Verify::new(Encrypted::new(self.key, s3));
171+ Ok(Box::new(spawn_server(storage, &addr)))
172+ }
173+ }
174+ }
175+
176+ /// Spawns the server and runs it to completion. This will run forever
177+ /// unless there is an error or the server shuts down gracefully.
178+ pub async fn run(
179+ self,
180+ addr: SocketAddr,
181+ ) -> Result<(), Box<dyn std::error::Error>> {
182+ let server = self.spawn(addr).await?;
183+
184+ log::info!("Listening on {}", server.addr());
185+
186+ server.await?;
187+ Ok(())
188+ }
189+ }
190+
191+ #[derive(Debug)]
192+ pub struct LocalServerBuilder {
193+ path: PathBuf,
194+ key: [u8; 32],
195+ cache: Option<Cache>,
196+ }
197+
198+ impl LocalServerBuilder {
199+ /// Creates a local server builder. `path` is the path to the folder where
200+ /// all of the LFS data will be stored.
201+ pub fn new(path: PathBuf, key: [u8; 32]) -> Self {
202+ Self {
203+ path,
204+ key,
205+ cache: None,
206+ }
207+ }
208+
209+ /// Sets the encryption key to use.
210+ pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
211+ self.key = key;
212+ self
213+ }
214+
215+ /// Sets the cache to use. If not specified, then no local disk cache is
216+ /// used. It is uncommon to want to use this when the object storage is
217+ /// already local. However, a cache may be useful when the data storage path
218+ /// is on a mounted network file system. In such a case, the network file
219+ /// system could be slow and the local disk storage could be fast.
220+ pub fn cache(&mut self, cache: Cache) -> &mut Self {
221+ self.cache = Some(cache);
222+ self
223+ }
224+
225+ /// Spawns the server. The server must be awaited on in order to accept
226+ /// incoming client connections and run.
227+ pub async fn spawn(
228+ self,
229+ addr: SocketAddr,
230+ ) -> Result<impl Server, Box<dyn std::error::Error>> {
231+ let storage = Disk::new(self.path).map_err(Error::from).await?;
232+ let storage = Verify::new(Encrypted::new(self.key, storage));
233+
234+ log::info!("Local disk storage initialized.");
235+
236+ Ok(spawn_server(storage, &addr))
237+ }
238+
239+ /// Spawns the server and runs it to completion. This will run forever
240+ /// unless there is an error or the server shuts down gracefully.
241+ pub async fn run(
242+ self,
243+ addr: SocketAddr,
244+ ) -> Result<(), Box<dyn std::error::Error>> {
245+ let server = self.spawn(addr).await?;
246+
247+ log::info!("Listening on {}", server.addr());
248+
249+ server.await?;
250+ Ok(())
251+ }
252+ }
253+
254+ fn spawn_server<S>(storage: S, addr: &SocketAddr) -> impl Server
255+ where
256+ S: Storage + Send + Sync + 'static,
257+ S::Error: Into<Error>,
258+ Error: From<S::Error>,
259+ {
260+ let storage = Arc::new(storage);
261+
262+ let new_service = make_service_fn(move |socket: &AddrStream| {
263+ // Create our app.
264+ let service = App::new(storage.clone());
265+
266+ // Add logging middleware
267+ future::ok::<_, Infallible>(Logger::new(socket.remote_addr(), service))
268+ });
269+
270+ hyper::Server::bind(&addr).serve(new_service)
271+ }
272 diff --git a/src/main.rs b/src/main.rs
273index 1975421..f511920 100644
274--- a/src/main.rs
275+++ b/src/main.rs
276 @@ -1,4 +1,4 @@
277- // Copyright (c) 2019 Jason White
278+ // Copyright (c) 2021 Jason White
279 //
280 // Permission is hereby granted, free of charge, to any person obtaining a copy
281 // of this software and associated documentation files (the "Software"), to deal
282 @@ -17,30 +17,13 @@
283 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
284 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
285 // SOFTWARE.
286- mod app;
287- mod error;
288- mod hyperext;
289- mod lfs;
290- mod logger;
291- mod lru;
292- mod sha256;
293- mod storage;
294- mod util;
295-
296- use std::convert::Infallible;
297 use std::net::{SocketAddr, ToSocketAddrs};
298 use std::path::PathBuf;
299- use std::sync::Arc;
300
301- use futures::future::{self, TryFutureExt};
302 use hex::FromHex;
303- use hyper::{self, server::conn::AddrStream, service::make_service_fn, Server};
304 use structopt::StructOpt;
305
306- use crate::app::App;
307- use crate::error::Error;
308- use crate::logger::Logger;
309- use crate::storage::{Cached, Disk, Encrypted, Retrying, Storage, Verify, S3};
310+ use rudolfs::{Cache, LocalServerBuilder, S3ServerBuilder};
311
312 #[cfg(feature = "faulty")]
313 use crate::storage::Faulty;
314 @@ -176,43 +159,22 @@ impl S3Args {
315 addr: SocketAddr,
316 global_args: GlobalArgs,
317 ) -> Result<(), Box<dyn std::error::Error>> {
318- let s3 = S3::new(self.bucket, self.prefix, self.cdn)
319- .map_err(Error::from)
320- .await?;
321-
322- // Retry certain operations to S3 to make it more reliable.
323- let s3 = Retrying::new(s3);
324-
325- // Add a little instability for testing purposes.
326- #[cfg(feature = "faulty")]
327- let s3 = Faulty::new(s3);
328-
329- match global_args.cache_dir {
330- Some(cache_dir) => {
331- // Convert cache size to bytes.
332- let max_cache_size = global_args
333- .max_cache_size
334- .into::<human_size::Byte>()
335- .value() as u64;
336-
337- // Use disk storage as a cache.
338- let disk = Disk::new(cache_dir).map_err(Error::from).await?;
339-
340- #[cfg(feature = "faulty")]
341- let disk = Faulty::new(disk);
342-
343- let cache = Cached::new(max_cache_size, disk, s3).await?;
344- let storage =
345- Verify::new(Encrypted::new(global_args.key, cache));
346- run_server(storage, &addr).await?;
347- }
348- None => {
349- let storage = Verify::new(Encrypted::new(global_args.key, s3));
350- run_server(storage, &addr).await?;
351- }
352+ let mut builder = S3ServerBuilder::new(self.bucket, global_args.key);
353+ builder.prefix(self.prefix);
354+
355+ if let Some(cdn) = self.cdn {
356+ builder.cdn(cdn);
357 }
358
359- Ok(())
360+ if let Some(cache_dir) = global_args.cache_dir {
361+ let max_cache_size = global_args
362+ .max_cache_size
363+ .into::<human_size::Byte>()
364+ .value() as u64;
365+ builder.cache(Cache::new(cache_dir, max_cache_size));
366+ }
367+
368+ builder.run(addr).await
369 }
370 }
371
372 @@ -222,40 +184,20 @@ impl LocalArgs {
373 addr: SocketAddr,
374 global_args: GlobalArgs,
375 ) -> Result<(), Box<dyn std::error::Error>> {
376- let storage = Disk::new(self.path).map_err(Error::from).await?;
377- let storage = Verify::new(Encrypted::new(global_args.key, storage));
378-
379- log::info!("Local disk storage initialized.");
380+ let mut builder = LocalServerBuilder::new(self.path, global_args.key);
381+
382+ if let Some(cache_dir) = global_args.cache_dir {
383+ let max_cache_size = global_args
384+ .max_cache_size
385+ .into::<human_size::Byte>()
386+ .value() as u64;
387+ builder.cache(Cache::new(cache_dir, max_cache_size));
388+ }
389
390- run_server(storage, &addr).await?;
391- Ok(())
392+ builder.run(addr).await
393 }
394 }
395
396- async fn run_server<S>(storage: S, addr: &SocketAddr) -> hyper::Result<()>
397- where
398- S: Storage + Send + Sync + 'static,
399- S::Error: Into<Error>,
400- Error: From<S::Error>,
401- {
402- let storage = Arc::new(storage);
403-
404- let new_service = make_service_fn(move |socket: &AddrStream| {
405- // Create our app.
406- let service = App::new(storage.clone());
407-
408- // Add logging middleware
409- future::ok::<_, Infallible>(Logger::new(socket.remote_addr(), service))
410- });
411-
412- let server = Server::bind(&addr).serve(new_service);
413-
414- log::info!("Listening on {}", server.local_addr());
415-
416- server.await?;
417- Ok(())
418- }
419-
420 #[tokio::main]
421 async fn main() {
422 let exit_code = if let Err(err) = Args::from_args().main().await {
423 diff --git a/src/storage/disk.rs b/src/storage/disk.rs
424index 86dd977..4b5a147 100644
425--- a/src/storage/disk.rs
426+++ b/src/storage/disk.rs
427 @@ -156,18 +156,18 @@ impl Storage for Backend {
428 ///
429 /// The directory structure is assumed to be like this:
430 ///
431- /// objects/{org}/{project}/
432- /// ├── 00
433- /// │ ├── 07
434- /// │ │ └── 0007941906960...
435- /// │ └── ff
436- /// │ └── 00ff9e9c69224...
437- /// ├── 01
438- /// │ ├── 89
439- /// │ │ └── 0189e5fd19477...
440- /// │ └── f5
441- /// │ └── 01f5c45c65e62...
442- /// ^^^^
443+ /// objects/{org}/{project}/
444+ /// ├── 00
445+ /// │ ├── 07
446+ /// │ │ └── 0007941906960...
447+ /// │ └── ff
448+ /// │ └── 00ff9e9c69224...
449+ /// ├── 01
450+ /// │ ├── 89
451+ /// │ │ └── 0189e5fd19477...
452+ /// │ └── f5
453+ /// │ └── 01f5c45c65e62...
454+ /// ^^^^
455 ///
456 /// Note that the first four characters are repeated in the file name so
457 /// that transforming the file name into an object ID is simpler.