Commit

Author:

Hash:

Timestamp:

+67 -27 +/-7 browse

Kevin Schoon [me@kevinschoon.com]

dc7eb0317fdce73a9b56ca284c7f99117530f765

Tue, 08 Apr 2025 21:51:50 +0000 (4 months ago)

reads work
1diff --git a/Cargo.lock b/Cargo.lock
2index f20db56..956e53f 100644
3--- a/Cargo.lock
4+++ b/Cargo.lock
5 @@ -570,6 +570,7 @@ dependencies = [
6 "serde_json",
7 "thiserror",
8 "tokio",
9+ "tokio-util",
10 "tower",
11 "tower-http",
12 "tracing",
13 @@ -925,6 +926,19 @@ dependencies = [
14 ]
15
16 [[package]]
17+ name = "tokio-util"
18+ version = "0.7.14"
19+ source = "registry+https://github.com/rust-lang/crates.io-index"
20+ checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034"
21+ dependencies = [
22+ "bytes",
23+ "futures-core",
24+ "futures-sink",
25+ "pin-project-lite",
26+ "tokio",
27+ ]
28+
29+ [[package]]
30 name = "tower"
31 version = "0.5.2"
32 source = "registry+https://github.com/rust-lang/crates.io-index"
33 diff --git a/Cargo.toml b/Cargo.toml
34index 55c172c..058c908 100644
35--- a/Cargo.toml
36+++ b/Cargo.toml
37 @@ -19,6 +19,7 @@ http = "1.3.1"
38 tower = { version = "0.5.2", features = ["util"] }
39 bytes = "1.10.1"
40 relative-path = "1.9.3"
41+ tokio-util = { version = "0.7.14", features = ["io"] }
42
43 [dev-dependencies]
44 tokio = { version = "1.44.1", features = ["full"] }
45 diff --git a/src/handlers.rs b/src/handlers.rs
46index 91c9488..1d4a40a 100644
47--- a/src/handlers.rs
48+++ b/src/handlers.rs
49 @@ -21,7 +21,7 @@ use serde::Deserialize;
50 use serde_json::json;
51 use uuid::Uuid;
52
53- use crate::{Namespace, error::Error, routes::AppState};
54+ use crate::{address::{Address, Blob}, error::Error, routes::AppState, storage::StorageIface, Namespace};
55
56 pub async fn index() -> Result<Json<serde_json::Value>, Error> {
57 Ok(Json(json!({})))
58 @@ -41,8 +41,12 @@ pub async fn read_blob(
59 State(state): State<Arc<AppState>>,
60 Extension(namespace): Extension<Namespace>,
61 Path(digest): Path<String>,
62- ) {
63+ ) -> Result<Response, Error> {
64+ let digest = Digest::from_str(&digest)?;
65+ let handle = state.oci.read_blob(&digest).await?;
66+ axum::body::Body::from_stream(handle);
67 println!("Namespace is: {}", namespace);
68+ todo!()
69 }
70
71 pub async fn stat_blob(
72 diff --git a/src/oci_interface.rs b/src/oci_interface.rs
73index 80eec14..5372ca6 100644
74--- a/src/oci_interface.rs
75+++ b/src/oci_interface.rs
76 @@ -12,7 +12,7 @@ use crate::{
77 Namespace,
78 address::{Address, Blob, Manifest, TempBlob},
79 error::Error,
80- storage::{Storage, StorageIface},
81+ storage::{InnerStream, Storage, StorageIface},
82 };
83
84 pub mod paths {
85 @@ -75,7 +75,8 @@ impl OciInterface {
86 manifest_bytes: &Bytes,
87 ) -> Result<(), Error> {
88 let tmp_blob_addr = Address::new(&TempBlob::from(&Uuid::new_v4()));
89- self.storage.inner()
90+ self.storage
91+ .inner()
92 .write_all(&tmp_blob_addr, manifest_bytes.to_vec().as_slice())
93 .await
94 .map_err(Error::Storage)?;
95 @@ -135,6 +136,11 @@ impl OciInterface {
96 .map_err(Error::Storage)
97 }
98
99+ pub async fn read_blob(&self, digest: &Digest) -> Result<InnerStream, Error> {
100+ let blob_addr = Address::data(&Blob::from(digest));
101+ self.store().read(&blob_addr).await.map_err(Error::Storage)
102+ }
103+
104 // pub async fn exists(&self, digest: &Digest) -> Result<bool, Error> {
105 // Ok(self
106 // .storage
107 diff --git a/src/routes.rs b/src/routes.rs
108index 43a8203..94751e0 100644
109--- a/src/routes.rs
110+++ b/src/routes.rs
111 @@ -1,4 +1,3 @@
112- use std::io::Bytes;
113 use std::str::FromStr;
114 use std::sync::Arc;
115
116 @@ -68,6 +67,7 @@ pub fn router(storage: &Storage) -> Router {
117 .route("/upload/{reference}", put(crate::handlers::close_blob))
118 .route("/blobs/uploads", post(crate::handlers::initiate_blob))
119 .route("/blobs/{digest}", head(crate::handlers::stat_blob))
120+ .route("/blobs/{digest}", get(crate::handlers::read_blob))
121 .route(
122 "/manifests/{digest}",
123 put(crate::handlers::write_manifest)
124 diff --git a/src/storage.rs b/src/storage.rs
125index 24c4767..a158341 100644
126--- a/src/storage.rs
127+++ b/src/storage.rs
128 @@ -1,7 +1,8 @@
129- use std::{io::Error as IoError, path::PathBuf, sync::Arc};
130+ use std::{io::Error as IoError, path::PathBuf, pin::Pin, sync::Arc};
131
132 use async_trait::async_trait;
133- use futures::{Stream, StreamExt, lock::Mutex};
134+ use bytes::Bytes;
135+ use futures::{lock::Mutex, stream::BoxStream, Stream, StreamExt};
136
137 use crate::{address::Address, storage_fs::FileSystem};
138
139 @@ -15,10 +16,33 @@ pub enum Error {
140 NotFound,
141 }
142
143+ pub struct InnerStream {
144+ stream: BoxStream<'static, Result<Bytes, std::io::Error>>,
145+ }
146+
147+ impl InnerStream {
148+ pub fn new(stream: BoxStream<'static, Result<Bytes, std::io::Error>>) -> Self {
149+ InnerStream {
150+ stream
151+ }
152+ }
153+ }
154+
155+ impl Stream for InnerStream {
156+ type Item = Result<Bytes, std::io::Error>;
157+
158+ fn poll_next(
159+ self: Pin<&mut Self>,
160+ cx: &mut std::task::Context<'_>,
161+ ) -> std::task::Poll<Option<Self::Item>> {
162+ Pin::new(&mut self.get_mut().stream).poll_next(cx)
163+ }
164+ }
165+
166 /// The storage trait needs to be implemented for accessing objects from the
167 /// platform. This API is based on registry/storage/driver/storagedriver.go in
168 /// the distribution codebase.
169- pub(crate) trait StorageIface: Sync + Send {
170+ pub trait StorageIface: Sync + Send {
171 /// List a single directory of objects
172 // async fn list(&self, addr: &Address) -> Result<Vec<Object>, Error>;
173 // async fn stat(&self, addr: &Address) -> Result<Option<Object>, Error>;
174 @@ -36,6 +60,7 @@ pub(crate) trait StorageIface: Sync + Send {
175 -> impl Future<Output = Result<(), Error>> + Send;
176 fn mv(&self, src: &Address, dst: &Address) -> impl Future<Output = Result<(), Error>> + Send;
177 // fn mv (&self, )std::future::Future<Output = ()> + Send
178+ fn read(&self, src: &Address) -> impl Future<Output = Result<InnerStream, Error>> + Send;
179 }
180
181 #[derive(Debug, Clone)]
182 @@ -56,21 +81,3 @@ impl Storage {
183 }
184 }
185 }
186-
187- // #[async_trait]
188- // pub trait StreamProcessor {
189- // async fn process_stream(&self, stream: Arc<Mutex<impl Stream<Item = i32> + Send + Unpin + 'static>>);
190- // }
191- //
192- // pub struct MyStreamProcessor;
193- //
194- // #[async_trait]
195- // impl StreamProcessor for MyStreamProcessor {
196- // async fn process_stream(&self, stream: Arc<Mutex<impl Stream<Item = i32> + Send + Unpin + 'static>>) {
197- // let mut stream = stream.lock().await;
198- //
199- // while let Some(item) = stream.next().await {
200- // println!("Processing item: {}", item);
201- // }
202- // }
203- // }
204 diff --git a/src/storage_fs.rs b/src/storage_fs.rs
205index 98b09f7..d36a5de 100644
206--- a/src/storage_fs.rs
207+++ b/src/storage_fs.rs
208 @@ -3,11 +3,12 @@ use std::{
209 path::{Path, PathBuf},
210 };
211
212+ use futures::StreamExt;
213 use tokio::io::AsyncWriteExt;
214
215 use crate::{
216 address::Address,
217- storage::{Error, StorageIface},
218+ storage::{Error, InnerStream, StorageIface},
219 };
220
221 #[derive(Clone)]
222 @@ -89,4 +90,11 @@ impl StorageIface for FileSystem {
223 tokio::fs::rename(src_path, dst_path).await?;
224 Ok(())
225 }
226+
227+ async fn read(&self, src: &Address) -> Result<InnerStream, Error> {
228+ let path = src.as_path(&self.base);
229+ let fp = tokio::fs::File::open(path.as_path()).await.unwrap();
230+ let stream = tokio_util::io::ReaderStream::new(fp);
231+ Ok(InnerStream::new(stream.boxed()))
232+ }
233 }