Author:
Hash:
Timestamp:
+89 -46 +/-6 browse
Kevin Schoon [me@kevinschoon.com]
def7e197331ce2464ba9c4c1c238f9d75fce9224
Sat, 05 Apr 2025 20:19:04 +0000 (4 months ago)
1 | diff --git a/examples/server.rs b/examples/server.rs |
2 | index d6d8727..184692e 100644 |
3 | --- a/examples/server.rs |
4 | +++ b/examples/server.rs |
5 | @@ -11,7 +11,7 @@ use axum::{ |
6 | middleware::{self, Next}, |
7 | response::Response, |
8 | }; |
9 | - use papyri::storage_fs::FileSystem; |
10 | + use papyri::storage::Storage; |
11 | use tower::Layer; |
12 | use tower_http::{normalize_path::NormalizePathLayer, trace::TraceLayer}; |
13 | use tracing::{Level, info_span}; |
14 | @@ -29,11 +29,12 @@ async fn main() -> Result<(), Box<dyn Error>> { |
15 | let listener = tokio::net::TcpListener::bind(ADDRESS).await?; |
16 | tracing::info!("listening @ {}", ADDRESS); |
17 | |
18 | - let fs = FileSystem::new(Path::new("registry")); |
19 | + |
20 | + let fs = Storage::FileSystem{ base: Path::new("registry").to_path_buf() }; |
21 | fs.init()?; |
22 | |
23 | // Registry middleware must be wrapped with namespace extraction/rewrite. |
24 | - let registry = papyri::routes::router(fs); |
25 | + let registry = papyri::routes::router(&fs); |
26 | let middleware = tower::util::MapRequestLayer::new(papyri::routes::extract_namespace); |
27 | |
28 | let router = |
29 | diff --git a/src/handlers.rs b/src/handlers.rs |
30 | index fe33325..91c9488 100644 |
31 | --- a/src/handlers.rs |
32 | +++ b/src/handlers.rs |
33 | @@ -1,4 +1,4 @@ |
34 | - use std::str::FromStr; |
35 | + use std::{str::FromStr, sync::Arc}; |
36 | |
37 | use axum::{ |
38 | Extension, Json, |
39 | @@ -28,7 +28,7 @@ pub async fn index() -> Result<Json<serde_json::Value>, Error> { |
40 | } |
41 | |
42 | pub async fn read_manifest( |
43 | - State(state): State<AppState>, |
44 | + State(state): State<Arc<AppState>>, |
45 | Path((name, reference)): Path<(String, String)>, |
46 | ) -> Result<Json<ImageManifest>, Error> { |
47 | let namespace = Namespace::from_str(&name)?; |
48 | @@ -38,7 +38,7 @@ pub async fn read_manifest( |
49 | } |
50 | |
51 | pub async fn read_blob( |
52 | - State(state): State<AppState>, |
53 | + State(state): State<Arc<AppState>>, |
54 | Extension(namespace): Extension<Namespace>, |
55 | Path(digest): Path<String>, |
56 | ) { |
57 | @@ -46,7 +46,7 @@ pub async fn read_blob( |
58 | } |
59 | |
60 | pub async fn stat_blob( |
61 | - State(state): State<AppState>, |
62 | + State(state): State<Arc<AppState>>, |
63 | Path(digest): Path<String>, |
64 | ) -> Result<StatusCode, Error> { |
65 | let digest = Digest::from_str(&digest)?; |
66 | @@ -89,7 +89,7 @@ where |
67 | |
68 | pub async fn write_manifest( |
69 | Extension(namespace): Extension<Namespace>, |
70 | - State(state): State<AppState>, |
71 | + State(state): State<Arc<AppState>>, |
72 | Path(reference): Path<String>, |
73 | ManifestExtractor((manifest_bytes, manifest)): ManifestExtractor, |
74 | ) -> Result<StatusCode, Error> { |
75 | @@ -108,7 +108,7 @@ pub struct UploadQuery { |
76 | |
77 | /// Write a complete blob in one request |
78 | pub async fn write_blob( |
79 | - State(state): State<AppState>, |
80 | + State(state): State<Arc<AppState>>, |
81 | Path(upload_uuid): Path<String>, |
82 | Query(query): Query<UploadQuery>, |
83 | req: Request, |
84 | @@ -124,7 +124,7 @@ pub async fn write_blob( |
85 | } |
86 | |
87 | pub async fn write_blob_chunk( |
88 | - State(state): State<AppState>, |
89 | + State(state): State<Arc<AppState>>, |
90 | Path(upload_uuid): Path<String>, |
91 | req: Request, |
92 | ) -> Result<Response, Error> { |
93 | @@ -144,7 +144,7 @@ pub async fn write_blob_chunk( |
94 | } |
95 | |
96 | pub async fn close_blob( |
97 | - State(state): State<AppState>, |
98 | + State(state): State<Arc<AppState>>, |
99 | Path(upload_uuid): Path<String>, |
100 | Query(query): Query<UploadQuery>, |
101 | req: Request, |
102 | @@ -168,7 +168,7 @@ const OCI_CHUNK_MIN_LENGTH: usize = 10_000_000; |
103 | /// FIXME: Per the spec for chunked uploads a header of Content-Length: 0 |
104 | /// MUST be present (for some reason). |
105 | pub async fn initiate_blob( |
106 | - State(state): State<AppState>, |
107 | + State(state): State<Arc<AppState>>, |
108 | Extension(namespace): Extension<Namespace>, |
109 | ) -> Result<Response, Error> { |
110 | let uuid = state.oci.new_blob(&namespace).await?; |
111 | @@ -188,7 +188,7 @@ pub async fn initiate_blob( |
112 | } |
113 | |
114 | pub(crate) async fn read_tags( |
115 | - State(state): State<AppState>, |
116 | + State(state): State<Arc<AppState>>, |
117 | Path(name): Path<String>, |
118 | ) -> Result<Json<TagList>, Error> { |
119 | let namespace = Namespace::from_str(&name)?; |
120 | diff --git a/src/oci_interface.rs b/src/oci_interface.rs |
121 | index 7e7ae67..80eec14 100644 |
122 | --- a/src/oci_interface.rs |
123 | +++ b/src/oci_interface.rs |
124 | @@ -12,7 +12,7 @@ use crate::{ |
125 | Namespace, |
126 | address::{Address, Blob, Manifest, TempBlob}, |
127 | error::Error, |
128 | - storage::Storage, |
129 | + storage::{Storage, StorageIface}, |
130 | }; |
131 | |
132 | pub mod paths { |
133 | @@ -21,17 +21,21 @@ pub mod paths { |
134 | |
135 | #[derive(Clone)] |
136 | pub(crate) struct OciInterface { |
137 | - pub storage: Arc<Box<dyn Storage>>, |
138 | + pub storage: Storage, |
139 | } |
140 | |
141 | impl OciInterface { |
142 | + fn store(&self) -> impl StorageIface { |
143 | + self.storage.inner() |
144 | + } |
145 | + |
146 | pub async fn list_tags(&self, namespace: &Namespace) -> Result<TagList, Error> { |
147 | todo!() |
148 | } |
149 | |
150 | pub async fn new_blob(&self, namespace: &Namespace) -> Result<Uuid, Error> { |
151 | let uuid = Uuid::new_v4(); |
152 | - self.storage |
153 | + self.store() |
154 | .write_all(&Address::new(&TempBlob::from(&uuid)), &[]) |
155 | .await |
156 | .map_err(Error::Storage)?; |
157 | @@ -41,7 +45,7 @@ impl OciInterface { |
158 | pub async fn commit_blob(&self, uuid: &Uuid, digest: &Digest) -> Result<(), Error> { |
159 | let tmp_blob_addr = Address::new(&TempBlob::from(uuid)); |
160 | let blob_addr = Address::data(&Blob::from(digest)); |
161 | - self.storage |
162 | + self.store() |
163 | .mv(&tmp_blob_addr, &blob_addr) |
164 | .await |
165 | .map_err(Error::Storage)?; |
166 | @@ -55,7 +59,7 @@ impl OciInterface { |
167 | let tmp_blob_addr = Address::new(&TempBlob::from(uuid)); |
168 | while let Some(item) = stream.next().await { |
169 | let chunk = item?.to_vec(); |
170 | - self.storage |
171 | + self.store() |
172 | .write(&tmp_blob_addr, chunk.as_slice()) |
173 | .await |
174 | .map_err(Error::Storage)?; |
175 | @@ -71,12 +75,12 @@ impl OciInterface { |
176 | manifest_bytes: &Bytes, |
177 | ) -> Result<(), Error> { |
178 | let tmp_blob_addr = Address::new(&TempBlob::from(&Uuid::new_v4())); |
179 | - self.storage |
180 | + self.storage.inner() |
181 | .write_all(&tmp_blob_addr, manifest_bytes.to_vec().as_slice()) |
182 | .await |
183 | .map_err(Error::Storage)?; |
184 | let blob_address = &Address::data(&Blob::from(digest)); |
185 | - self.storage |
186 | + self.store() |
187 | .mv(&tmp_blob_addr, blob_address) |
188 | .await |
189 | .map_err(Error::Storage)?; |
190 | @@ -84,7 +88,7 @@ impl OciInterface { |
191 | namespace, |
192 | name: tag, |
193 | }); |
194 | - self.storage |
195 | + self.store() |
196 | .write_all(tag_addr, digest.to_string().as_bytes()) |
197 | .await |
198 | .map_err(Error::Storage)?; |
199 | @@ -125,7 +129,7 @@ impl OciInterface { |
200 | |
201 | pub async fn has_blob(&self, digest: &Digest) -> Result<bool, Error> { |
202 | let blob_addr = Address::data(&Blob::from(digest)); |
203 | - self.storage |
204 | + self.store() |
205 | .exists(&blob_addr) |
206 | .await |
207 | .map_err(Error::Storage) |
208 | diff --git a/src/routes.rs b/src/routes.rs |
209 | index ffece69..43a8203 100644 |
210 | --- a/src/routes.rs |
211 | +++ b/src/routes.rs |
212 | @@ -57,7 +57,7 @@ pub fn extract_namespace(mut req: Request<axum::body::Body>) -> Request<axum::bo |
213 | |
214 | const MAXIMUM_MANIFEST_SIZE: usize = 5_000_000; |
215 | |
216 | - pub fn router(storage: impl Storage + 'static) -> Router { |
217 | + pub fn router(storage: &Storage) -> Router { |
218 | Router::new() |
219 | .route("/v2", get(crate::handlers::index)) |
220 | .route( |
221 | @@ -100,9 +100,9 @@ pub fn router(storage: impl Storage + 'static) -> Router { |
222 | // // "/{name}/blobs/{digest}", |
223 | // // delete(crate::handlers::delete_blob), |
224 | // // ) |
225 | - .with_state(AppState { |
226 | + .with_state(Arc::new(AppState { |
227 | oci: OciInterface { |
228 | - storage: Arc::new(Box::new(storage)), |
229 | + storage: storage.clone(), |
230 | }, |
231 | - }) |
232 | + })) |
233 | } |
234 | diff --git a/src/storage.rs b/src/storage.rs |
235 | index 0391647..24c4767 100644 |
236 | --- a/src/storage.rs |
237 | +++ b/src/storage.rs |
238 | @@ -1,6 +1,9 @@ |
239 | - use std::{io::Error as IoError}; |
240 | + use std::{io::Error as IoError, path::PathBuf, sync::Arc}; |
241 | |
242 | - use crate::address::Address; |
243 | + use async_trait::async_trait; |
244 | + use futures::{Stream, StreamExt, lock::Mutex}; |
245 | + |
246 | + use crate::{address::Address, storage_fs::FileSystem}; |
247 | |
248 | #[derive(thiserror::Error, Debug)] |
249 | pub enum Error { |
250 | @@ -15,17 +18,59 @@ pub enum Error { |
251 | /// The storage trait needs to be implemented for accessing objects from the |
252 | /// platform. This API is based on registry/storage/driver/storagedriver.go in |
253 | /// the distribution codebase. |
254 | - #[async_trait::async_trait] |
255 | - pub trait Storage: Sync + Send { |
256 | + pub(crate) trait StorageIface: Sync + Send { |
257 | /// List a single directory of objects |
258 | // async fn list(&self, addr: &Address) -> Result<Vec<Object>, Error>; |
259 | // async fn stat(&self, addr: &Address) -> Result<Option<Object>, Error>; |
260 | // async fn read_bytes(&self, addr: &Address) -> Result<Option<Vec<u8>>, Error>; |
261 | - /// Check if an object exists at the given address |
262 | - async fn exists(&self, path: &Address) -> Result<bool, Error>; |
263 | + /// Check if an object exists at the given address |
264 | + fn exists(&self, path: &Address) -> impl Future<Output = Result<bool, Error>> + Send; |
265 | /// Write bytes to the address, truncating any existing object |
266 | - async fn write_all(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>; |
267 | + fn write_all( |
268 | + &self, |
269 | + path: &Address, |
270 | + bytes: &[u8], |
271 | + ) -> impl Future<Output = Result<(), Error>> + Send; |
272 | /// write bytes to a file that has already been created |
273 | - async fn write(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>; |
274 | - async fn mv(&self, src: &Address, dst: &Address) -> Result<(), Error>; |
275 | + fn write(&self, path: &Address, bytes: &[u8]) |
276 | + -> impl Future<Output = Result<(), Error>> + Send; |
277 | + fn mv(&self, src: &Address, dst: &Address) -> impl Future<Output = Result<(), Error>> + Send; |
278 | + // fn mv (&self, )std::future::Future<Output = ()> + Send |
279 | + } |
280 | + |
281 | + #[derive(Debug, Clone)] |
282 | + pub enum Storage { |
283 | + FileSystem { base: PathBuf }, |
284 | } |
285 | + |
286 | + impl Storage { |
287 | + pub fn init(&self) -> Result<(), Error> { |
288 | + match self { |
289 | + Storage::FileSystem { base } => FileSystem { base: base.clone() }.init(), |
290 | + } |
291 | + } |
292 | + |
293 | + pub fn inner(&self) -> impl StorageIface { |
294 | + match self { |
295 | + Storage::FileSystem { base } => FileSystem { base: base.clone() }, |
296 | + } |
297 | + } |
298 | + } |
299 | + |
300 | + // #[async_trait] |
301 | + // pub trait StreamProcessor { |
302 | + // async fn process_stream(&self, stream: Arc<Mutex<impl Stream<Item = i32> + Send + Unpin + 'static>>); |
303 | + // } |
304 | + // |
305 | + // pub struct MyStreamProcessor; |
306 | + // |
307 | + // #[async_trait] |
308 | + // impl StreamProcessor for MyStreamProcessor { |
309 | + // async fn process_stream(&self, stream: Arc<Mutex<impl Stream<Item = i32> + Send + Unpin + 'static>>) { |
310 | + // let mut stream = stream.lock().await; |
311 | + // |
312 | + // while let Some(item) = stream.next().await { |
313 | + // println!("Processing item: {}", item); |
314 | + // } |
315 | + // } |
316 | + // } |
317 | diff --git a/src/storage_fs.rs b/src/storage_fs.rs |
318 | index faeb078..98b09f7 100644 |
319 | --- a/src/storage_fs.rs |
320 | +++ b/src/storage_fs.rs |
321 | @@ -7,21 +7,15 @@ use tokio::io::AsyncWriteExt; |
322 | |
323 | use crate::{ |
324 | address::Address, |
325 | - storage::{Error, Storage}, |
326 | + storage::{Error, StorageIface}, |
327 | }; |
328 | |
329 | #[derive(Clone)] |
330 | - pub struct FileSystem { |
331 | - base: PathBuf, |
332 | + pub(crate) struct FileSystem { |
333 | + pub base: PathBuf, |
334 | } |
335 | |
336 | impl FileSystem { |
337 | - pub fn new(path: &Path) -> Self { |
338 | - Self { |
339 | - base: path.to_path_buf(), |
340 | - } |
341 | - } |
342 | - |
343 | pub fn init(&self) -> Result<(), Error> { |
344 | std::fs::create_dir_all(self.base.as_path())?; |
345 | Ok(()) |
346 | @@ -34,8 +28,7 @@ impl FileSystem { |
347 | } |
348 | } |
349 | |
350 | - #[async_trait::async_trait] |
351 | - impl Storage for FileSystem { |
352 | + impl StorageIface for FileSystem { |
353 | async fn exists(&self, addr: &Address) -> Result<bool, Error> { |
354 | let path = addr.as_path(&self.base); |
355 | if tokio::fs::try_exists(&path) |