Commit

Author:

Hash:

Timestamp:

+201 -82 +/-10 browse

Kevin Schoon [me@kevinschoon.com]

00247a62c414dfdbb041093d9acfc98460063433

Sat, 05 Apr 2025 15:38:46 +0000 (4 months ago)

mostly wrap up push operations
1diff --git a/Cargo.lock b/Cargo.lock
2index a1fce97..f20db56 100644
3--- a/Cargo.lock
4+++ b/Cargo.lock
5 @@ -565,6 +565,7 @@ dependencies = [
6 "http",
7 "oci-spec",
8 "regex",
9+ "relative-path",
10 "serde",
11 "serde_json",
12 "thiserror",
13 @@ -696,6 +697,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
14 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
15
16 [[package]]
17+ name = "relative-path"
18+ version = "1.9.3"
19+ source = "registry+https://github.com/rust-lang/crates.io-index"
20+ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
21+
22+ [[package]]
23 name = "rustc-demangle"
24 version = "0.1.24"
25 source = "registry+https://github.com/rust-lang/crates.io-index"
26 diff --git a/Cargo.toml b/Cargo.toml
27index 2677521..55c172c 100644
28--- a/Cargo.toml
29+++ b/Cargo.toml
30 @@ -18,6 +18,7 @@ uuid = { version = "1.16.0", features = ["v4"] }
31 http = "1.3.1"
32 tower = { version = "0.5.2", features = ["util"] }
33 bytes = "1.10.1"
34+ relative-path = "1.9.3"
35
36 [dev-dependencies]
37 tokio = { version = "1.44.1", features = ["full"] }
38 diff --git a/src/address.rs b/src/address.rs
39index ea16ed4..96edd5f 100644
40--- a/src/address.rs
41+++ b/src/address.rs
42 @@ -4,12 +4,17 @@ use std::{
43 };
44
45 use oci_spec::image::Digest;
46+ use relative_path::RelativePath;
47 use uuid::Uuid;
48
49 use crate::Namespace;
50
51 const SEPARATOR: &str = "/";
52
53+ pub trait Addressable {
54+ fn parts(&self) -> Vec<String>;
55+ }
56+
57 /// Address is a path-like object for addressing OCI objects. The design of
58 /// this basically copies the file system layout used by the Docker
59 /// distribution registry. https://github.com/distribution/distribution
60 @@ -21,18 +26,25 @@ pub struct Address {
61 impl Address {
62 pub fn as_path(&self, base_dir: &Path) -> PathBuf {
63 let parts = self.parts.join(SEPARATOR);
64- let path = Path::new(&parts);
65- base_dir.join(path)
66+ RelativePath::new(&parts).to_path(base_dir)
67 }
68
69- pub fn link(&self) -> Self {
70- let mut parts = self.parts.clone();
71+ pub fn new(addr: &impl Addressable) -> Self {
72+ Address {
73+ parts: addr.parts(),
74+ }
75+ }
76+
77+ /// Create an addressable link
78+ pub fn link(addr: &impl Addressable) -> Self {
79+ let mut parts = addr.parts();
80 parts.push(String::from("link"));
81 Address { parts }
82 }
83
84- pub fn data(&self) -> Self {
85- let mut parts = self.parts.clone();
86+ /// create an addressable data path
87+ pub fn data(addr: &impl Addressable) -> Self {
88+ let mut parts = addr.parts();
89 parts.push(String::from("data"));
90 Address { parts }
91 }
92 @@ -49,17 +61,15 @@ pub struct TagDirectory<'a> {
93 pub namespace: &'a Namespace,
94 }
95
96- impl<'a> From<&'a Namespace> for TagDirectory<'a> {
97- fn from(value: &'a Namespace) -> Self {
98- TagDirectory { namespace: value }
99+ impl Addressable for TagDirectory<'_> {
100+ fn parts(&self) -> Vec<String> {
101+ vec![self.namespace.to_string(), String::from("tags")]
102 }
103 }
104
105- impl From<TagDirectory<'_>> for Address {
106- fn from(value: TagDirectory<'_>) -> Self {
107- Address {
108- parts: vec![value.namespace.to_string(), String::from("tags")],
109- }
110+ impl<'a> From<&'a Namespace> for TagDirectory<'a> {
111+ fn from(value: &'a Namespace) -> Self {
112+ TagDirectory { namespace: value }
113 }
114 }
115
116 @@ -69,15 +79,13 @@ pub struct Tag<'a> {
117 pub name: &'a str,
118 }
119
120- impl From<Tag<'_>> for Address {
121- fn from(value: Tag<'_>) -> Self {
122- Address {
123- parts: vec![
124- value.namespace.to_string(),
125- String::from("tags"),
126- value.name.to_string(),
127- ],
128- }
129+ impl Addressable for Tag<'_> {
130+ fn parts(&self) -> Vec<String> {
131+ vec![
132+ self.namespace.to_string(),
133+ String::from("tags"),
134+ self.name.to_string(),
135+ ]
136 }
137 }
138
139 @@ -92,11 +100,9 @@ impl<'a> From<&'a Uuid> for TempBlob<'a> {
140 }
141 }
142
143- impl From<TempBlob<'_>> for Address {
144- fn from(value: TempBlob<'_>) -> Self {
145- Address {
146- parts: vec![String::from("tmp"), value.uuid.to_string()],
147- }
148+ impl Addressable for TempBlob<'_> {
149+ fn parts(&self) -> Vec<String> {
150+ vec![String::from("tmp"), self.uuid.to_string()]
151 }
152 }
153
154 @@ -111,11 +117,11 @@ impl<'a> From<&'a Digest> for Blob<'a> {
155 }
156 }
157
158- impl From<Blob<'_>> for Address {
159- fn from(value: Blob<'_>) -> Self {
160- let digest_str = value.digest.digest();
161+ impl Addressable for Blob<'_> {
162+ fn parts(&self) -> Vec<String> {
163+ let digest_str = self.digest.digest();
164 let first_two: String = digest_str.chars().take(2).collect();
165- let parts = match value.digest.algorithm() {
166+ match self.digest.algorithm() {
167 oci_spec::image::DigestAlgorithm::Sha256 => vec![
168 String::from("blobs"),
169 String::from("sha256"),
170 @@ -126,8 +132,7 @@ impl From<Blob<'_>> for Address {
171 oci_spec::image::DigestAlgorithm::Sha512 => todo!(),
172 oci_spec::image::DigestAlgorithm::Other(_) => todo!(),
173 _ => todo!(),
174- };
175- Address { parts }
176+ }
177 }
178 }
179
180 @@ -137,9 +142,14 @@ pub struct Manifest<'a> {
181 pub digest: &'a Digest,
182 }
183
184- impl From<Manifest<'_>> for Address {
185- fn from(value: Manifest<'_>) -> Self {
186- todo!()
187+ impl Addressable for Manifest<'_> {
188+ fn parts(&self) -> Vec<String> {
189+ vec![
190+ self.namespace.to_string(),
191+ String::from("manifests"),
192+ self.digest.algorithm().to_string(),
193+ self.digest.digest().to_string(),
194+ ]
195 }
196 }
197
198 @@ -152,9 +162,9 @@ mod test {
199 #[test]
200 pub fn addresses() {
201 let namespace = Namespace::from_str("hello/world").unwrap();
202- assert!(Address::from(TagDirectory::from(&namespace)).to_string() == "hello/world/tags");
203+ assert!(Address::new(&TagDirectory::from(&namespace)).to_string() == "hello/world/tags");
204 assert!(
205- Address::from(Tag {
206+ Address::new(&Tag {
207 namespace: &namespace,
208 name: "latest"
209 })
210 @@ -162,14 +172,22 @@ mod test {
211 == "hello/world/tags/latest"
212 );
213 let uuid = Uuid::new_v4();
214- assert!(Address::from(TempBlob::from(&uuid)).to_string() == format!("tmp/{}", uuid));
215+ assert!(Address::new(&TempBlob::from(&uuid)).to_string() == format!("tmp/{}", uuid));
216 let digest = Digest::from_str(
217 "sha256:57f2ae062b76cff6f5a511fe6f907decfdefd6495e6afa31c44e0a6a1eca146f",
218 )
219 .unwrap();
220 assert!(
221- Address::from(Blob { digest: &digest }).to_string()
222+ Address::new(&Blob { digest: &digest }).to_string()
223 == "blobs/sha256/57/57f2ae062b76cff6f5a511fe6f907decfdefd6495e6afa31c44e0a6a1eca146f"
224+ );
225+ assert!(
226+ Address::new(&Manifest {
227+ namespace: &namespace,
228+ digest: &digest
229+ })
230+ .to_string()
231+ == "hello/world/manifests/sha256/57f2ae062b76cff6f5a511fe6f907decfdefd6495e6afa31c44e0a6a1eca146f"
232 )
233 }
234 }
235 diff --git a/src/error.rs b/src/error.rs
236index 495f30b..ddc49ec 100644
237--- a/src/error.rs
238+++ b/src/error.rs
239 @@ -1,4 +1,5 @@
240 use axum::{Json, response::IntoResponse};
241+ use http::StatusCode;
242
243 use crate::storage::Error as StorageError;
244
245 @@ -99,6 +100,17 @@ impl From<&Error> for Message {
246 impl IntoResponse for Error {
247 fn into_response(self) -> axum::response::Response {
248 let message = Message::from(&self);
249- Json(message).into_response()
250+ let mut res = Json(message).into_response();
251+ let status_code = res.status_mut();
252+ *status_code = StatusCode::INTERNAL_SERVER_ERROR;
253+ tracing::error!("Server failure: {:?}", self);
254+ // match self {
255+ // Error::Code(code) => todo!(),
256+ // Error::Storage(error) => todo!(),
257+ // Error::OciInternal(oci_spec_error) => todo!(),
258+ // Error::OciParsing(parse_error) => todo!(),
259+ // Error::Stream(_) => todo!(),
260+ // }
261+ res
262 }
263 }
264 diff --git a/src/handlers.rs b/src/handlers.rs
265index 1a1648e..fe33325 100644
266--- a/src/handlers.rs
267+++ b/src/handlers.rs
268 @@ -9,7 +9,10 @@ use axum::{
269 };
270 use bytes::{Buf, Bytes};
271 use futures::TryStreamExt;
272- use http::header::CONTENT_TYPE;
273+ use http::{
274+ HeaderValue,
275+ header::{CONTENT_LENGTH, CONTENT_TYPE},
276+ };
277 use oci_spec::{
278 distribution::{Reference, TagList},
279 image::{Digest, ImageManifest, MediaType},
280 @@ -84,19 +87,16 @@ where
281 }
282 }
283
284- /// NOTE: The registry MUST store the manifest in the exact byte representation
285- /// provided by the client.
286- #[axum::debug_handler]
287 pub async fn write_manifest(
288 Extension(namespace): Extension<Namespace>,
289 State(state): State<AppState>,
290- Path(digest): Path<String>,
291+ Path(reference): Path<String>,
292 ManifestExtractor((manifest_bytes, manifest)): ManifestExtractor,
293 ) -> Result<StatusCode, Error> {
294- let digest = Digest::from_str(&digest)?;
295+ let digest = manifest.config().digest();
296 state
297 .oci
298- .write_manifest(&namespace, &digest, &manifest, &manifest_bytes)
299+ .write_manifest(&namespace, digest, &reference, &manifest_bytes)
300 .await?;
301 Ok(StatusCode::OK)
302 }
303 @@ -106,6 +106,7 @@ pub struct UploadQuery {
304 pub digest: String,
305 }
306
307+ /// Write a complete blob in one request
308 pub async fn write_blob(
309 State(state): State<AppState>,
310 Path(upload_uuid): Path<String>,
311 @@ -117,23 +118,72 @@ pub async fn write_blob(
312 .map_err(|_| Error::Code(crate::error::Code::BlobUploadInvalid))?;
313 let stream = req.into_body().into_data_stream();
314 let stream = stream.map_err(|e| Error::Stream(e.to_string()));
315- state
316- .oci
317- .write_blob(Box::pin(stream), &uuid, &digest)
318- .await?;
319+ state.oci.write_chunk(Box::pin(stream), &uuid).await?;
320+ state.oci.commit_blob(&uuid, &digest).await?;
321 Ok(StatusCode::OK)
322 }
323
324+ pub async fn write_blob_chunk(
325+ State(state): State<AppState>,
326+ Path(upload_uuid): Path<String>,
327+ req: Request,
328+ ) -> Result<Response, Error> {
329+ let uuid = Uuid::from_str(&upload_uuid)
330+ .map_err(|_| Error::Code(crate::error::Code::BlobUploadInvalid))?;
331+ let stream = req.into_body().into_data_stream();
332+ let stream = stream.map_err(|e| Error::Stream(e.to_string()));
333+ state.oci.write_chunk(Box::pin(stream), &uuid).await?;
334+ let mut res = Response::new(axum::body::Body::empty());
335+ let status = res.status_mut();
336+ *status = StatusCode::ACCEPTED;
337+ let uuid_str = uuid.to_string();
338+ let uri = format!("/v2/upload/{}", uuid_str);
339+ let headers = res.headers_mut();
340+ headers.insert("Location", uri.parse().unwrap());
341+ Ok(res)
342+ }
343+
344+ pub async fn close_blob(
345+ State(state): State<AppState>,
346+ Path(upload_uuid): Path<String>,
347+ Query(query): Query<UploadQuery>,
348+ req: Request,
349+ ) -> Result<StatusCode, Error> {
350+ let digest = Digest::from_str(&query.digest)?;
351+ let uuid = Uuid::from_str(&upload_uuid)
352+ .map_err(|_| Error::Code(crate::error::Code::BlobUploadInvalid))?;
353+ let headers = req.headers();
354+ if headers.get(CONTENT_LENGTH).is_some() {
355+ let stream = req.into_body().into_data_stream();
356+ let stream = stream.map_err(|e| Error::Stream(e.to_string()));
357+ state.oci.write_chunk(Box::pin(stream), &uuid).await?;
358+ }
359+ state.oci.commit_blob(&uuid, &digest).await?;
360+ Ok(StatusCode::CREATED)
361+ }
362+
363+ const OCI_CHUNK_MIN_LENGTH: usize = 10_000_000;
364+
365+ /// Initiate a blob upload
366+ /// FIXME: Per the spec for chunked uploads a header of Content-Length: 0
367+ /// MUST be present (for some reason).
368 pub async fn initiate_blob(
369 State(state): State<AppState>,
370 Extension(namespace): Extension<Namespace>,
371 ) -> Result<Response, Error> {
372 let uuid = state.oci.new_blob(&namespace).await?;
373 let mut res = Response::new(axum::body::Body::empty());
374+ let status = res.status_mut();
375+ *status = StatusCode::ACCEPTED;
376 let headers = res.headers_mut();
377 let uuid_str = uuid.to_string();
378 let uri = format!("/v2/upload/{}", uuid_str);
379 headers.insert("Location", uri.parse().unwrap());
380+ // FIXME: make configurable
381+ headers.insert(
382+ "OCI-Chunk-Min-Length",
383+ HeaderValue::from_str(&OCI_CHUNK_MIN_LENGTH.to_string()).unwrap(),
384+ );
385 Ok(res)
386 }
387
388 diff --git a/src/lib.rs b/src/lib.rs
389index 8566750..f41d040 100644
390--- a/src/lib.rs
391+++ b/src/lib.rs
392 @@ -2,6 +2,7 @@ use std::{fmt::Display, path::Path, str::FromStr};
393
394 use error::Error;
395 use regex::Regex;
396+ use relative_path::RelativePath;
397
398 pub mod address;
399 pub mod error;
400 @@ -21,8 +22,8 @@ const NAME_REGEXP_MATCH: &str =
401 pub struct Namespace(String);
402
403 impl Namespace {
404- pub fn path(&self) -> &Path {
405- Path::new(&self.0)
406+ pub fn path(&self) -> &RelativePath {
407+ RelativePath::new(&self.0)
408 }
409 }
410
411 @@ -45,6 +46,12 @@ impl Display for Namespace {
412 }
413 }
414
415+ impl AsRef<str> for Namespace {
416+ fn as_ref(&self) -> &str {
417+ &self.0
418+ }
419+ }
420+
421 #[cfg(test)]
422 mod test {
423 use super::*;
424 diff --git a/src/oci_interface.rs b/src/oci_interface.rs
425index 769095a..7e7ae67 100644
426--- a/src/oci_interface.rs
427+++ b/src/oci_interface.rs
428 @@ -6,7 +6,6 @@ use oci_spec::{
429 distribution::{Reference, TagList},
430 image::{Digest, ImageManifest},
431 };
432- use serde::Serialize;
433 use uuid::Uuid;
434
435 use crate::{
436 @@ -33,34 +32,34 @@ impl OciInterface {
437 pub async fn new_blob(&self, namespace: &Namespace) -> Result<Uuid, Error> {
438 let uuid = Uuid::new_v4();
439 self.storage
440- .write_all(&TempBlob::from(&uuid).into(), &[])
441+ .write_all(&Address::new(&TempBlob::from(&uuid)), &[])
442 .await
443 .map_err(Error::Storage)?;
444 Ok(uuid)
445 }
446
447- pub async fn write_blob<S>(
448- &self,
449- mut stream: Pin<Box<S>>,
450- uuid: &Uuid,
451- digest: &Digest,
452- ) -> Result<(), Error>
453+ pub async fn commit_blob(&self, uuid: &Uuid, digest: &Digest) -> Result<(), Error> {
454+ let tmp_blob_addr = Address::new(&TempBlob::from(uuid));
455+ let blob_addr = Address::data(&Blob::from(digest));
456+ self.storage
457+ .mv(&tmp_blob_addr, &blob_addr)
458+ .await
459+ .map_err(Error::Storage)?;
460+ Ok(())
461+ }
462+
463+ pub async fn write_chunk<S>(&self, mut stream: Pin<Box<S>>, uuid: &Uuid) -> Result<(), Error>
464 where
465 S: Stream<Item = Result<Bytes, Error>>,
466 {
467- let tmp_blob: Address = TempBlob::from(uuid).into();
468+ let tmp_blob_addr = Address::new(&TempBlob::from(uuid));
469 while let Some(item) = stream.next().await {
470 let chunk = item?.to_vec();
471 self.storage
472- .write(&tmp_blob, chunk.as_slice())
473+ .write(&tmp_blob_addr, chunk.as_slice())
474 .await
475 .map_err(Error::Storage)?;
476 }
477- let blob: Address = Blob::from(digest).into();
478- self.storage
479- .mv(&tmp_blob, &blob.data())
480- .await
481- .map_err(Error::Storage)?;
482 Ok(())
483 }
484
485 @@ -68,22 +67,41 @@ impl OciInterface {
486 &self,
487 namespace: &Namespace,
488 digest: &Digest,
489- manifest: &ImageManifest,
490+ tag: &str,
491 manifest_bytes: &Bytes,
492 ) -> Result<(), Error> {
493- let blob: Address = Blob::from(digest).into();
494+ let tmp_blob_addr = Address::new(&TempBlob::from(&Uuid::new_v4()));
495+ self.storage
496+ .write_all(&tmp_blob_addr, manifest_bytes.to_vec().as_slice())
497+ .await
498+ .map_err(Error::Storage)?;
499+ let blob_address = &Address::data(&Blob::from(digest));
500 self.storage
501- .write_all(&blob.data(), manifest_bytes.to_vec().as_slice())
502+ .mv(&tmp_blob_addr, blob_address)
503 .await
504 .map_err(Error::Storage)?;
505+ let tag_addr = &Address::link(&crate::address::Tag {
506+ namespace,
507+ name: tag,
508+ });
509 self.storage
510- .write_all(
511- &crate::address::Manifest { namespace, digest }.into(),
512- digest.to_string().as_bytes(),
513- )
514+ .write_all(tag_addr, digest.to_string().as_bytes())
515 .await
516 .map_err(Error::Storage)?;
517 Ok(())
518+ // let blob_addr = Address::data(&Blob::from(digest));
519+ // self.storage
520+ // .write_all(&blob_addr, manifest_bytes.to_vec().as_slice())
521+ // .await
522+ // .map_err(Error::Storage)?;
523+ // self.storage
524+ // .write_all(
525+ // &Address::link(&crate::address::Manifest { namespace, digest }),
526+ // digest.to_string().as_bytes(),
527+ // )
528+ // .await
529+ // .map_err(Error::Storage)?;
530+ // Ok(())
531 }
532
533 pub async fn read_manifest(
534 @@ -106,9 +124,9 @@ impl OciInterface {
535 }
536
537 pub async fn has_blob(&self, digest: &Digest) -> Result<bool, Error> {
538- let address: Address = Blob::from(digest).into();
539+ let blob_addr = Address::data(&Blob::from(digest));
540 self.storage
541- .exists(&address.data())
542+ .exists(&blob_addr)
543 .await
544 .map_err(Error::Storage)
545 }
546 diff --git a/src/routes.rs b/src/routes.rs
547index fd1e36c..ffece69 100644
548--- a/src/routes.rs
549+++ b/src/routes.rs
550 @@ -3,7 +3,7 @@ use std::str::FromStr;
551 use std::sync::Arc;
552
553 use axum::extract::{DefaultBodyLimit, Request};
554- use axum::routing::{head, post, put};
555+ use axum::routing::{head, patch, post, put};
556 use axum::{Router, routing::get};
557 use http::Uri;
558
559 @@ -60,7 +60,12 @@ const MAXIMUM_MANIFEST_SIZE: usize = 5_000_000;
560 pub fn router(storage: impl Storage + 'static) -> Router {
561 Router::new()
562 .route("/v2", get(crate::handlers::index))
563- .route("/upload/{uuid}", put(crate::handlers::write_blob))
564+ .route(
565+ "/upload/{reference}",
566+ patch(crate::handlers::write_blob_chunk),
567+ )
568+ .route("/upload/{reference}", post(crate::handlers::write_blob))
569+ .route("/upload/{reference}", put(crate::handlers::close_blob))
570 .route("/blobs/uploads", post(crate::handlers::initiate_blob))
571 .route("/blobs/{digest}", head(crate::handlers::stat_blob))
572 .route(
573 diff --git a/src/storage.rs b/src/storage.rs
574index 88339a8..0391647 100644
575--- a/src/storage.rs
576+++ b/src/storage.rs
577 @@ -1,4 +1,4 @@
578- use std::{io::Error as IoError, path::Path};
579+ use std::{io::Error as IoError};
580
581 use crate::address::Address;
582
583 diff --git a/src/storage_fs.rs b/src/storage_fs.rs
584index 16a9894..faeb078 100644
585--- a/src/storage_fs.rs
586+++ b/src/storage_fs.rs
587 @@ -59,6 +59,7 @@ impl Storage for FileSystem {
588 /// }
589
590 async fn write_all(&self, addr: &Address, bytes: &[u8]) -> Result<(), Error> {
591+ tracing::info!("Writing blob to {}", addr);
592 let path = addr.as_path(&self.base);
593 self.ensure_dir(&path).await?;
594 let mut fp = tokio::fs::OpenOptions::new()