Commit

Author:

Hash:

Timestamp:

+177 -30 +/-9 browse

Kevin Schoon [me@kevinschoon.com]

7156b7188bf7bae8d80caa58fa9aafe77d4e8344

Thu, 17 Apr 2025 16:10:45 +0000 (1 month ago)

add find support for storage
1diff --git a/Cargo.lock b/Cargo.lock
2index 3dfa26a..ef5e606 100644
3--- a/Cargo.lock
4+++ b/Cargo.lock
5 @@ -698,6 +698,7 @@ dependencies = [
6 "tracing",
7 "tracing-subscriber",
8 "uuid",
9+ "walkdir",
10 ]
11
12 [[package]]
13 @@ -850,6 +851,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
14 checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
15
16 [[package]]
17+ name = "same-file"
18+ version = "1.0.6"
19+ source = "registry+https://github.com/rust-lang/crates.io-index"
20+ checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
21+ dependencies = [
22+ "winapi-util",
23+ ]
24+
25+ [[package]]
26 name = "scopeguard"
27 version = "1.2.0"
28 source = "registry+https://github.com/rust-lang/crates.io-index"
29 @@ -1213,6 +1223,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
30 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
31
32 [[package]]
33+ name = "walkdir"
34+ version = "2.5.0"
35+ source = "registry+https://github.com/rust-lang/crates.io-index"
36+ checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
37+ dependencies = [
38+ "same-file",
39+ "winapi-util",
40+ ]
41+
42+ [[package]]
43 name = "wasi"
44 version = "0.11.0+wasi-snapshot-preview1"
45 source = "registry+https://github.com/rust-lang/crates.io-index"
46 @@ -1244,6 +1264,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
47 checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
48
49 [[package]]
50+ name = "winapi-util"
51+ version = "0.1.9"
52+ source = "registry+https://github.com/rust-lang/crates.io-index"
53+ checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
54+ dependencies = [
55+ "windows-sys",
56+ ]
57+
58+ [[package]]
59 name = "winapi-x86_64-pc-windows-gnu"
60 version = "0.4.0"
61 source = "registry+https://github.com/rust-lang/crates.io-index"
62 diff --git a/Cargo.toml b/Cargo.toml
63index a3810c2..1348aac 100644
64--- a/Cargo.toml
65+++ b/Cargo.toml
66 @@ -27,6 +27,7 @@ hex-literal = "1.0.0"
67 base16ct = { version = "0.2.0", features = ["alloc"] }
68 base64 = "0.22.1"
69 askama = { version = "0.13.1", features = ["serde_json"], optional = true}
70+ walkdir = "2.5.0"
71
72 [dev-dependencies]
73 tokio = { version = "1.44.1", features = ["full"] }
74 diff --git a/src/address.rs b/src/address.rs
75index 342dde5..9f48d9d 100644
76--- a/src/address.rs
77+++ b/src/address.rs
78 @@ -23,7 +23,21 @@ fn digest_prefix(digest: &Digest) -> &str {
79 }
80 }
81
82- #[derive(Debug, PartialEq, Eq)]
83+ #[derive(Copy, Clone, Debug, PartialEq, Eq)]
84+ pub enum Kind {
85+ Tag,
86+ TagDirectory,
87+ Reference,
88+ TempBlob,
89+ Blob,
90+ BlobsRoot,
91+ ManifestRevision,
92+ LayerLink,
93+ Repository,
94+ RepositoriesRoot,
95+ }
96+
97+ #[derive(Debug, Hash, PartialEq, Eq)]
98 pub enum Address {
99 Tag {
100 namespace: Namespace,
101 @@ -43,6 +57,7 @@ pub enum Address {
102 Blob {
103 digest: Digest,
104 },
105+ BlobsRoot,
106 ManifestRevision {
107 namespace: Namespace,
108 digest: Digest,
109 @@ -54,6 +69,7 @@ pub enum Address {
110 Repository {
111 namespace: Namespace,
112 },
113+ RepositoriesRoot,
114 }
115
116 impl Display for Address {
117 @@ -71,10 +87,41 @@ impl Address {
118 pub fn is_link(&self) -> bool {
119 self.components().last().is_some_and(|part| part == "link")
120 }
121+
122 pub fn is_data(&self) -> bool {
123 self.components().last().is_some_and(|part| part == "data")
124 }
125
126+ pub fn kind(&self) -> Kind {
127+ match self {
128+ Address::Tag {
129+ namespace: _,
130+ name: _,
131+ } => Kind::Tag,
132+ Address::TagDirectory { namespace: _ } => Kind::TagDirectory,
133+ Address::Reference {
134+ namespace: _,
135+ digest: _,
136+ } => Kind::Reference,
137+ Address::TempBlob {
138+ uuid: _,
139+ namespace: _,
140+ } => Kind::TempBlob,
141+ Address::Blob { digest: _ } => Kind::Blob,
142+ Address::ManifestRevision {
143+ namespace: _,
144+ digest: _,
145+ } => Kind::ManifestRevision,
146+ Address::LayerLink {
147+ namespace: _,
148+ digest: _,
149+ } => Kind::LayerLink,
150+ Address::Repository { namespace: _ } => Kind::Repository,
151+ Address::BlobsRoot => Kind::BlobsRoot,
152+ Address::RepositoriesRoot => Kind::RepositoriesRoot,
153+ }
154+ }
155+
156 fn segment_ns(
157 parts: impl IntoIterator<Item = String>,
158 ) -> Result<(Namespace, Vec<String>), crate::error::Error> {
159 @@ -159,6 +206,8 @@ impl Address {
160 .split("/")
161 .map(|part| part.to_string()),
162 ),
163+ Address::BlobsRoot => vec![String::from("blobs")],
164+ Address::RepositoriesRoot => vec![String::from("repositories")],
165 }
166 }
167 }
168 @@ -171,6 +220,9 @@ impl FromStr for Address {
169 let first = split.pop_front().unwrap();
170 match first.as_str() {
171 "blobs" => {
172+ if split.is_empty() {
173+ return Ok(Address::BlobsRoot);
174+ }
175 // example: sha256/57/57f2ae062b76cff6f5a511fe6f907decfdefd6495e6afa31c44e0a6a1eca146f/data
176 let algorithm = split.pop_front().unwrap();
177 split.pop_front();
178 @@ -179,12 +231,15 @@ impl FromStr for Address {
179 Ok(Address::Blob { digest })
180 }
181 "repositories" => {
182+ if split.is_empty() {
183+ return Ok(Address::RepositoriesRoot);
184+ }
185 let (namespace, rest) = Address::segment_ns(split)?;
186 // println!("{} {:?}", namespace, rest);
187 if rest.len() == 1 && rest[0] == "_tags" {
188 // example: /{namespace}/tags
189 Ok(Address::TagDirectory { namespace })
190- } else if rest.len() > 2
191+ } else if rest.len() > 3
192 && rest[0] == "_tags"
193 && rest[2] == "current"
194 && rest[3] == "link"
195 @@ -287,6 +342,8 @@ mod test {
196 "repositories/hello/world///fuu/bar/tags",
197 "repositories/fuu/bar",
198 "repositories/fuu",
199+ "repositories",
200+ "blobs",
201 ]
202 .iter()
203 .for_each(|item| {
204 diff --git a/src/axum/mod.rs b/src/axum/mod.rs
205index b44f1c8..f7f09d1 100644
206--- a/src/axum/mod.rs
207+++ b/src/axum/mod.rs
208 @@ -25,7 +25,7 @@ mod paths;
209 pub mod web;
210
211 #[derive(Clone)]
212- pub(crate) struct AppState {
213+ pub struct AppState {
214 pub oci: OciInterface,
215 }
216
217 diff --git a/src/axum/web/router.rs b/src/axum/web/router.rs
218index 9d3012d..e565c4e 100644
219--- a/src/axum/web/router.rs
220+++ b/src/axum/web/router.rs
221 @@ -2,6 +2,7 @@ use std::sync::Arc;
222
223 use axum::{
224 Extension, Router,
225+ extract::State,
226 response::{Html, Response},
227 routing::get,
228 };
229 @@ -26,11 +27,17 @@ pub async fn stylesheet() -> Response {
230 res
231 }
232
233- pub async fn index() -> Result<Html<String>, crate::axum::error::Error> {
234+ pub async fn index(
235+ State(state): State<Arc<AppState>>,
236+ ) -> Result<Html<String>, crate::axum::error::Error> {
237 let template = RepositoryIndex {
238 title: "Repositories",
239 repositories: vec![String::from("Hello")],
240 };
241+ let namespaces = state.oci.list_namespaces().await?;
242+ namespaces.iter().for_each(|ns| {
243+ println!("NS: {}", ns);
244+ });
245 Ok(Html::from(template.to_string()))
246 }
247
248 diff --git a/src/namespace.rs b/src/namespace.rs
249index b06f587..45019a9 100644
250--- a/src/namespace.rs
251+++ b/src/namespace.rs
252 @@ -11,7 +11,7 @@ use crate::error::Error;
253 const NAME_REGEXP_MATCH: &str = r"^[a-z0-9]+(?:[._-][a-z0-9]+)*";
254
255 // TODO: Consider 255 char namespace limit - hostname length per spec docs
256- #[derive(Clone, Debug, PartialEq, Eq)]
257+ #[derive(Clone, Debug, Hash, PartialEq, Eq)]
258 pub struct Namespace(Vec<String>);
259
260 impl Namespace {
261 diff --git a/src/oci_interface.rs b/src/oci_interface.rs
262index d9908d1..09df8e2 100644
263--- a/src/oci_interface.rs
264+++ b/src/oci_interface.rs
265 @@ -11,7 +11,7 @@ use uuid::Uuid;
266
267 use crate::{
268 Namespace, TagOrDigest,
269- address::Address,
270+ address::{Address, Kind},
271 error::Error,
272 storage::{InnerStream, StorageIface},
273 };
274 @@ -336,7 +336,7 @@ impl OciInterface {
275 .enumerate()
276 .find_map(|(i, name)| if **name == tag_name { Some(i) } else { None })
277 {
278- items.split_at(last_pos+1).1
279+ items.split_at(last_pos + 1).1
280 } else {
281 items.as_slice()
282 }
283 @@ -356,4 +356,19 @@ impl OciInterface {
284 Ok(tag_list.build().unwrap())
285 }
286 }
287+
288+ pub async fn list_namespaces(&self) -> Result<Vec<Namespace>, Error> {
289+ let results = self
290+ .storage
291+ .find(&Address::RepositoriesRoot, Some(Kind::Repository))
292+ .await
293+ .map_err(Error::Storage)?;
294+ let namespaces: Vec<Namespace> = results.iter().filter_map(|addr| {
295+ match addr {
296+ Address::Repository { namespace } => Some(namespace.clone()),
297+ _ => None
298+ }
299+ }).collect();
300+ Ok(namespaces)
301+ }
302 }
303 diff --git a/src/storage.rs b/src/storage.rs
304index 211d04f..ef19917 100644
305--- a/src/storage.rs
306+++ b/src/storage.rs
307 @@ -1,9 +1,13 @@
308- use std::{io::Error as IoError, path::PathBuf, pin::Pin};
309+ use std::{collections::HashSet, io::Error as IoError, path::PathBuf, pin::Pin};
310
311 use bytes::Bytes;
312 use futures::{Stream, stream::BoxStream};
313
314- use crate::{address::Address, storage_fs::FileSystem};
315+ use crate::{
316+ Namespace,
317+ address::{Address, Kind},
318+ storage_fs::FileSystem,
319+ };
320
321 #[derive(thiserror::Error, Debug)]
322 pub enum Error {
323 @@ -43,21 +47,22 @@ impl Stream for InnerStream {
324 pub trait StorageIface: Sync + Send {
325 /// List a single directory of objects
326 async fn list(&self, addr: &Address) -> Result<Vec<String>, Error>;
327- // async fn stat(&self, addr: &Address) -> Result<Option<Object>, Error>;
328- // async fn read_bytes(&self, addr: &Address) -> Result<Option<Vec<u8>>, Error>;
329 /// Check if an object exists at the given address
330- async fn exists<'a>(&self, path: &Address) -> Result<bool, Error>;
331+ async fn exists(&self, path: &Address) -> Result<bool, Error>;
332 /// Write bytes to the address, truncating any existing object
333- async fn write_all<'a>(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>;
334-
335- /// write bytes to a file that has already been created
336- async fn write<'a>(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>;
337-
338- async fn mv<'a>(&self, src: &Address, dst: &Address) -> Result<(), Error>;
339- // fn mv (&self, )std::future::Future<Output = ()> + Send
340- async fn read<'a>(&self, src: &Address) -> Result<InnerStream, Error>;
341- async fn read_bytes<'a>(&self, src: &Address) -> Result<Bytes, Error>;
342- async fn delete<'a>(&self, src: &Address) -> Result<(), Error>;
343+ async fn write_all(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>;
344+ /// Write bytes to a file that has already been created
345+ async fn write(&self, path: &Address, bytes: &[u8]) -> Result<(), Error>;
346+ /// Move a file from one address to another
347+ async fn mv(&self, src: &Address, dst: &Address) -> Result<(), Error>;
348+ /// Read an address to a stream
349+ async fn read(&self, src: &Address) -> Result<InnerStream, Error>;
350+ /// Read an address into memory
351+ async fn read_bytes(&self, src: &Address) -> Result<Bytes, Error>;
352+ /// Delete an object at the address
353+ async fn delete(&self, src: &Address) -> Result<(), Error>;
354+ /// Find objects in the underlying storage
355+ async fn find(&self, start: &Address, filter: Option<Kind>) -> Result<HashSet<Address>, Error>;
356 }
357
358 #[derive(Debug, Clone)]
359 diff --git a/src/storage_fs.rs b/src/storage_fs.rs
360index fac64a4..0c9bd4a 100644
361--- a/src/storage_fs.rs
362+++ b/src/storage_fs.rs
363 @@ -1,14 +1,18 @@
364 use std::{
365+ collections::HashSet,
366 io::Cursor,
367 path::{Path, PathBuf},
368+ str::FromStr,
369 };
370
371 use bytes::Bytes;
372 use futures::StreamExt;
373 use tokio::io::AsyncWriteExt;
374+ use walkdir::WalkDir;
375
376 use crate::{
377- address::Address,
378+ Namespace,
379+ address::{Address, Kind},
380 storage::{Error, InnerStream, StorageIface},
381 };
382
383 @@ -32,7 +36,7 @@ impl FileSystem {
384
385 #[async_trait::async_trait]
386 impl StorageIface for FileSystem {
387- async fn exists<'a>(&self, addr: &Address) -> Result<bool, Error> {
388+ async fn exists(&self, addr: &Address) -> Result<bool, Error> {
389 let path = addr.path(&self.base);
390 if tokio::fs::try_exists(&path)
391 .await
392 @@ -45,7 +49,7 @@ impl StorageIface for FileSystem {
393 }
394 }
395
396- async fn write_all<'a>(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
397+ async fn write_all(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
398 let path = addr.path(&self.base);
399 self.ensure_dir(&path).await?;
400 let mut fp = tokio::fs::OpenOptions::new()
401 @@ -61,7 +65,7 @@ impl StorageIface for FileSystem {
402 Ok(())
403 }
404
405- async fn write<'a>(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
406+ async fn write(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
407 let path = addr.path(&self.base);
408 let mut fp = tokio::fs::OpenOptions::new()
409 .create(false)
410 @@ -76,7 +80,7 @@ impl StorageIface for FileSystem {
411 Ok(())
412 }
413
414- async fn mv<'a>(&self, src: &Address, dst: &Address) -> Result<(), Error> {
415+ async fn mv(&self, src: &Address, dst: &Address) -> Result<(), Error> {
416 let src_path = src.path(&self.base);
417 let dst_path = dst.path(&self.base);
418 self.ensure_dir(&dst_path).await?;
419 @@ -85,7 +89,7 @@ impl StorageIface for FileSystem {
420 Ok(())
421 }
422
423- async fn read<'a>(&self, src: &Address) -> Result<InnerStream, Error> {
424+ async fn read(&self, src: &Address) -> Result<InnerStream, Error> {
425 let path = src.path(&self.base);
426 let fp = tokio::fs::File::open(path.as_path())
427 .await
428 @@ -97,7 +101,7 @@ impl StorageIface for FileSystem {
429 Ok(InnerStream::new(stream.boxed()))
430 }
431
432- async fn read_bytes<'a>(&self, src: &Address) -> Result<Bytes, Error> {
433+ async fn read_bytes(&self, src: &Address) -> Result<Bytes, Error> {
434 let path = src.path(&self.base);
435 let payload = tokio::fs::read(path.as_path())
436 .await
437 @@ -108,7 +112,7 @@ impl StorageIface for FileSystem {
438 Ok(Bytes::from(payload))
439 }
440
441- async fn delete<'a>(&self, src: &Address) -> Result<(), Error> {
442+ async fn delete(&self, src: &Address) -> Result<(), Error> {
443 let path = src.path(&self.base);
444 tokio::fs::remove_file(path.as_path()).await?;
445 Ok(())
446 @@ -126,4 +130,33 @@ impl StorageIface for FileSystem {
447 names.sort();
448 Ok(names)
449 }
450+
451+ async fn find(&self, start: &Address, filter: Option<Kind>) -> Result<HashSet<Address>, Error> {
452+ let base_dir = self.base.clone();
453+ let start_dir = start.path(&self.base);
454+ let handle = tokio::spawn(async move {
455+ WalkDir::new(start_dir.as_path())
456+ .into_iter()
457+ .filter_map(|entry| entry.ok())
458+ .fold(HashSet::new(), |mut accm, entry| {
459+ if entry.path().eq(&start_dir) {
460+ return accm;
461+ };
462+ let addr_path = entry.path().strip_prefix(base_dir.as_path()).unwrap();
463+ let addr_path_str = addr_path.to_str().unwrap();
464+ if let Ok(addr) = Address::from_str(addr_path_str) {
465+ if let Some(filter) = filter {
466+ if filter == addr.kind() {
467+ accm.insert(addr);
468+ }
469+ } else {
470+ accm.insert(addr);
471+ }
472+ };
473+ accm
474+ })
475+ });
476+ let results = handle.await.unwrap();
477+ Ok(results)
478+ }
479 }