Commit

Author:

Hash:

Timestamp:

+69 -153 +/-4 browse

Kevin Schoon [me@kevinschoon.com]

c27765405e2f7bf7e8f2d33d2c301023f3b8450f

Fri, 04 Apr 2025 15:02:56 +0000 (4 months ago)

refactor fs with addresses
1diff --git a/src/handlers.rs b/src/handlers.rs
2index 60fbdbe..617f847 100644
3--- a/src/handlers.rs
4+++ b/src/handlers.rs
5 @@ -2,13 +2,11 @@ use std::str::FromStr;
6
7 use axum::{
8 Extension, Json,
9- body::BodyDataStream,
10 extract::{Path, Query, Request, State},
11 http::StatusCode,
12 response::Response,
13 };
14- use bytes::Bytes;
15- use futures::{Stream, StreamExt, TryStreamExt};
16+ use futures::TryStreamExt;
17 use http::header::CONTENT_TYPE;
18 use oci_spec::{
19 distribution::{Reference, TagList},
20 @@ -50,7 +48,7 @@ pub async fn stat_blob(
21 ) -> Result<StatusCode, Error> {
22 let _ = Namespace::from_str(&name)?;
23 let digest = Digest::from_str(&digest)?;
24- if !state.oci.exists(&digest).await? {
25+ if !state.oci.has_blob(&digest).await? {
26 Ok(StatusCode::NOT_FOUND)
27 } else {
28 Ok(StatusCode::OK)
29 @@ -83,7 +81,6 @@ pub struct UploadQuery {
30 pub digest: String,
31 }
32
33- #[axum::debug_handler]
34 pub async fn write_blob(
35 State(state): State<AppState>,
36 Path(upload_uuid): Path<String>,
37 diff --git a/src/oci_interface.rs b/src/oci_interface.rs
38index bf94810..aca106d 100644
39--- a/src/oci_interface.rs
40+++ b/src/oci_interface.rs
41 @@ -11,8 +11,9 @@ use uuid::Uuid;
42
43 use crate::{
44 Namespace,
45+ address::{Address, Blob, TempBlob},
46 error::Error,
47- storage::{Address, Storage},
48+ storage::Storage,
49 };
50
51 pub mod paths {
52 @@ -32,7 +33,7 @@ impl OciInterface {
53 pub async fn new_blob(&self, namespace: &Namespace) -> Result<Uuid, Error> {
54 let uuid = Uuid::new_v4();
55 self.storage
56- .write_all(&Address::BlobInit { uuid: &uuid }, &[])
57+ .write_all(&TempBlob::from(&uuid).into(), &[])
58 .await
59 .map_err(Error::Storage)?;
60 Ok(uuid)
61 @@ -47,24 +48,27 @@ impl OciInterface {
62 where
63 S: Stream<Item = Result<Bytes, Error>>,
64 {
65- let tmp_blob = &Address::WriteBlob { uuid };
66+ let tmp_blob: Address = TempBlob::from(uuid).into();
67 while let Some(item) = stream.next().await {
68 let chunk = item?.to_vec();
69 self.storage
70- .write(tmp_blob, chunk.as_slice())
71+ .write(&tmp_blob, chunk.as_slice())
72 .await
73 .map_err(Error::Storage)?;
74 }
75- let dst_path = &Address::BlobSpec { digest };
76+ let blob: Address = Blob::from(digest).into();
77 self.storage
78- .mv(tmp_blob, dst_path)
79+ .mv(&tmp_blob, &blob.data())
80 .await
81 .map_err(Error::Storage)?;
82 Ok(())
83 }
84
85- pub async fn write_manifest(&self, namespace: &Namespace, manifest: &ImageManifest) -> Result<(), Error> {
86- let serialized = manifest.to_string().unwrap();
87+ pub async fn write_manifest(
88+ &self,
89+ namespace: &Namespace,
90+ manifest: &ImageManifest,
91+ ) -> Result<(), Error> {
92 todo!()
93 }
94
95 @@ -73,25 +77,34 @@ impl OciInterface {
96 namespace: &Namespace,
97 reference: &Reference,
98 ) -> Result<ImageManifest, Error> {
99- let bytes = self
100- .storage
101- .read_bytes(&Address::Manifest {
102- namespace,
103- reference,
104- })
105- .await
106- .map_err(Error::Storage)?
107- .unwrap();
108- let manifest: ImageManifest = serde_json::from_slice(bytes.as_slice()).unwrap();
109- Ok(manifest)
110+ todo!()
111+ // let bytes = self
112+ // .storage
113+ // .read_bytes(&Address::Manifest {
114+ // namespace,
115+ // reference,
116+ // })
117+ // .await
118+ // .map_err(Error::Storage)?
119+ // .unwrap();
120+ // let manifest: ImageManifest = serde_json::from_slice(bytes.as_slice()).unwrap();
121+ // Ok(manifest)
122 }
123
124- pub async fn exists(&self, digest: &Digest) -> Result<bool, Error> {
125- Ok(self
126- .storage
127- .stat(&Address::BlobSpec { digest })
128+ pub async fn has_blob(&self, digest: &Digest) -> Result<bool, Error> {
129+ let address: Address = Blob::from(digest).into();
130+ self.storage
131+ .exists(&address.data())
132 .await
133- .map_err(Error::Storage)?
134- .is_some())
135+ .map_err(Error::Storage)
136 }
137+
138+ // pub async fn exists(&self, digest: &Digest) -> Result<bool, Error> {
139+ // Ok(self
140+ // .storage
141+ // .stat(&Address::BlobSpec { digest })
142+ // .await
143+ // .map_err(Error::Storage)?
144+ // .is_some())
145+ // }
146 }
147 diff --git a/src/storage.rs b/src/storage.rs
148index 8539feb..88339a8 100644
149--- a/src/storage.rs
150+++ b/src/storage.rs
151 @@ -1,14 +1,6 @@
152- use std::{
153- io::Error as IoError,
154- path::{Path, PathBuf},
155- pin::Pin,
156- };
157+ use std::{io::Error as IoError, path::Path};
158
159- use bytes::Bytes;
160- use futures::Stream;
161- use oci_spec::{distribution::Reference, image::Digest};
162-
163- use crate::Namespace;
164+ use crate::address::Address;
165
166 #[derive(thiserror::Error, Debug)]
167 pub enum Error {
168 @@ -20,98 +12,20 @@ pub enum Error {
169 NotFound,
170 }
171
172- pub struct Object;
173-
174- /// Address normalizes parameters into a general path like object which can
175- /// be used for retrieval of objects from the storage implementation.
176- pub enum Address<'a> {
177- TagPath {
178- namespace: &'a Namespace,
179- },
180- Manifest {
181- namespace: &'a Namespace,
182- reference: &'a Reference,
183- },
184- BlobSpec {
185- digest: &'a Digest,
186- },
187- BlobInit {
188- uuid: &'a uuid::Uuid,
189- },
190- WriteBlob {
191- uuid: &'a uuid::Uuid,
192- },
193- MoveBlob {
194- uuid: &'a uuid::Uuid,
195- digest: &'a Digest,
196- },
197- }
198-
199- impl Address<'_> {
200- /// return the relative path for a certain object or directory of objects
201- pub fn path(&self) -> PathBuf {
202- match self {
203- Address::TagPath { namespace } => {
204- Path::new(&format!("repositories/{}/tags", namespace)).to_path_buf()
205- }
206- Address::BlobSpec { digest } => {
207- let digest_str = digest.digest();
208- let first_two: String = digest_str.chars().take(2).collect();
209- match digest.algorithm() {
210- oci_spec::image::DigestAlgorithm::Sha256 => Path::new(&format!(
211- "repositories/blobs/sha256/{}/{}/data",
212- first_two, digest_str
213- ))
214- .to_path_buf(),
215- oci_spec::image::DigestAlgorithm::Sha384 => todo!(),
216- oci_spec::image::DigestAlgorithm::Sha512 => todo!(),
217- oci_spec::image::DigestAlgorithm::Other(_) => todo!(),
218- _ => todo!(),
219- }
220- }
221- Address::Manifest {
222- namespace,
223- reference,
224- } => {
225- todo!()
226- }
227- Address::BlobInit { uuid } => {
228- Path::new(&format!("repositories/temp/{}", uuid)).to_path_buf()
229- }
230- Address::WriteBlob { uuid } => {
231- Path::new(&format!("repositories/temp/{}", uuid)).to_path_buf()
232- }
233- Address::MoveBlob { uuid, digest } => {
234- let digest_str = digest.digest();
235- let first_two: String = digest_str.chars().take(2).collect();
236- match digest.algorithm() {
237- oci_spec::image::DigestAlgorithm::Sha256 => Path::new(&format!(
238- "repositories/blobs/sha256/{}/{}/data",
239- first_two, digest_str
240- ))
241- .to_path_buf(),
242- oci_spec::image::DigestAlgorithm::Sha384 => todo!(),
243- oci_spec::image::DigestAlgorithm::Sha512 => todo!(),
244- oci_spec::image::DigestAlgorithm::Other(_) => todo!(),
245- _ => todo!(),
246- }
247- },
248- }
249- }
250- }
251-
252 /// The storage trait needs to be implemented for accessing objects from the
253 /// platform. This API is based on registry/storage/driver/storagedriver.go in
254 /// the distribution codebase.
255 #[async_trait::async_trait]
256 pub trait Storage: Sync + Send {
257 /// List a single directory of objects
258- async fn list(&self, addr: &Address) -> Result<Vec<Object>, Error>;
259- async fn stat(&self, addr: &Address) -> Result<Option<Object>, Error>;
260- async fn read_bytes(&self, addr: &Address) -> Result<Option<Vec<u8>>, Error>;
261+ // async fn list(&self, addr: &Address) -> Result<Vec<Object>, Error>;
262+ // async fn stat(&self, addr: &Address) -> Result<Option<Object>, Error>;
263+ // async fn read_bytes(&self, addr: &Address) -> Result<Option<Vec<u8>>, Error>;
264+ /// Check if an object exists at the given address
265+ async fn exists(&self, path: &Address) -> Result<bool, Error>;
266 /// Write bytes to the address, truncating any existing object
267- async fn write_all(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error>;
268+ async fn write_all(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>;
269 /// write bytes to a file that has already been created
270- async fn write(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error>;
271+ async fn write(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>;
272 async fn mv(&self, src: &Address, dst: &Address) -> Result<(), Error>;
273 }
274 diff --git a/src/storage_fs.rs b/src/storage_fs.rs
275index 460bfb5..16a9894 100644
276--- a/src/storage_fs.rs
277+++ b/src/storage_fs.rs
278 @@ -1,14 +1,14 @@
279 use std::{
280 io::Cursor,
281 path::{Path, PathBuf},
282- pin::Pin,
283 };
284
285- use bytes::Bytes;
286- use futures::Stream;
287 use tokio::io::AsyncWriteExt;
288
289- use crate::storage::{Address, Error, Object, Storage};
290+ use crate::{
291+ address::Address,
292+ storage::{Error, Storage},
293+ };
294
295 #[derive(Clone)]
296 pub struct FileSystem {
297 @@ -36,38 +36,30 @@ impl FileSystem {
298
299 #[async_trait::async_trait]
300 impl Storage for FileSystem {
301- async fn list(&self, addr: &Address) -> Result<Vec<Object>, Error> {
302- todo!()
303- }
304-
305- async fn stat(&self, addr: &Address) -> Result<Option<Object>, Error> {
306- let path = self.base.join(addr.path());
307+ async fn exists(&self, addr: &Address) -> Result<bool, Error> {
308+ let path = addr.as_path(&self.base);
309 if tokio::fs::try_exists(&path)
310 .await
311 .is_ok_and(|exists| exists)
312 {
313 let md = tokio::fs::metadata(path).await?;
314- if md.is_file() {
315- Ok(Some(Object))
316- } else {
317- Ok(None)
318- }
319+ if md.is_file() { Ok(true) } else { Ok(false) }
320 } else {
321- Ok(None)
322+ Ok(false)
323 }
324 }
325
326- async fn read_bytes(&self, addr: &Address) -> Result<Option<Vec<u8>>, Error> {
327- if self.stat(addr).await?.is_none() {
328- return Ok(None);
329- }
330- let path = addr.path();
331- let bytes = tokio::fs::read(&path).await?;
332- Ok(Some(bytes))
333- }
334+ /// async fn read_bytes(&self, addr: &Address) -> Result<Option<Vec<u8>>, Error> {
335+ /// if self.stat(addr).await?.is_none() {
336+ /// return Ok(None);
337+ /// }
338+ /// let path = addr.path();
339+ /// let bytes = tokio::fs::read(&path).await?;
340+ /// Ok(Some(bytes))
341+ /// }
342
343 async fn write_all(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
344- let path = addr.path();
345+ let path = addr.as_path(&self.base);
346 self.ensure_dir(&path).await?;
347 let mut fp = tokio::fs::OpenOptions::new()
348 .create(true)
349 @@ -82,7 +74,7 @@ impl Storage for FileSystem {
350 }
351
352 async fn write(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
353- let path = addr.path();
354+ let path = addr.as_path(&self.base);
355 let mut fp = tokio::fs::OpenOptions::new()
356 .create(false)
357 .truncate(false)
358 @@ -96,8 +88,8 @@ impl Storage for FileSystem {
359 }
360
361 async fn mv(&self, src: &Address, dst: &Address) -> Result<(), Error> {
362- let src_path = src.path();
363- let dst_path = dst.path();
364+ let src_path = src.as_path(&self.base);
365+ let dst_path = dst.as_path(&self.base);
366 self.ensure_dir(&dst_path).await?;
367 tracing::info!("mv {:?} -> {:?}", src_path, dst_path);
368 tokio::fs::rename(src_path, dst_path).await?;