1 | use std::{pin::Pin, str::FromStr}; |
2 | |
3 | use bytes::Bytes; |
4 | use futures::{Stream, StreamExt}; |
5 | use oci_spec::{ |
6 | distribution::TagList, |
7 | image::{Digest, ImageManifest}, |
8 | }; |
9 | use sha2::{Digest as HashDigest, Sha256}; |
10 | use uuid::Uuid; |
11 | |
12 | use crate::{ |
13 | Namespace, TagOrDigest, |
14 | address::{Address, Addressable, Blob, LayerLink, Reference, Tag, TempBlob}, |
15 | error::Error, |
16 | storage::{InnerStream, Storage, StorageIface}, |
17 | }; |
18 | |
19 | use base64::prelude::*; |
20 | |
21 | pub mod paths { |
22 | pub const REPOSITORIES: &str = "/repositories"; |
23 | } |
24 | |
25 | #[derive(Clone)] |
26 | pub struct OciInterface { |
27 | pub storage: Storage, |
28 | } |
29 | |
30 | impl OciInterface { |
31 | fn store(&self) -> impl StorageIface { |
32 | self.storage.inner() |
33 | } |
34 | |
35 | async fn resolve_link<A>(&self, link: &A) -> Result<Address, Error> |
36 | where |
37 | A: Addressable + Send + Sync, |
38 | { |
39 | let digest_bytes = self |
40 | .store() |
41 | .read_bytes(link) |
42 | .await |
43 | .map_err(Error::Storage)? |
44 | .to_vec(); |
45 | let digest_str = String::from_utf8_lossy(digest_bytes.as_slice()); |
46 | let digest = Digest::from_str(&digest_str)?; |
47 | let blob_addr = Blob::from(&digest); |
48 | Ok(blob_addr.address()) |
49 | } |
50 | |
51 | pub async fn new_blob(&self, namespace: &Namespace) -> Result<Uuid, Error> { |
52 | let uuid = Uuid::new_v4(); |
53 | self.store() |
54 | .write_all( |
55 | &TempBlob { |
56 | uuid: &uuid, |
57 | namespace, |
58 | }, |
59 | &[], |
60 | ) |
61 | .await |
62 | .map_err(Error::Storage)?; |
63 | Ok(uuid) |
64 | } |
65 | |
66 | pub async fn commit_blob( |
67 | &self, |
68 | namespace: &Namespace, |
69 | uuid: &Uuid, |
70 | digest: &Digest, |
71 | ) -> Result<(), Error> { |
72 | self.store() |
73 | .mv(&TempBlob { uuid, namespace }, &Blob::from(digest)) |
74 | .await |
75 | .map_err(Error::Storage)?; |
76 | Ok(()) |
77 | } |
78 | |
79 | pub async fn delete_blob(&self, namespace: &Namespace, digest: &Digest) -> Result<(), Error> { |
80 | self.store() |
81 | .delete(&LayerLink { namespace, digest }) |
82 | .await |
83 | .map_err(Error::Storage)?; |
84 | Ok(()) |
85 | } |
86 | |
87 | pub async fn write_chunk<S>( |
88 | &self, |
89 | mut stream: Pin<Box<S>>, |
90 | namespace: &Namespace, |
91 | uuid: &Uuid, |
92 | ) -> Result<(), Error> |
93 | where |
94 | S: Stream<Item = Result<Bytes, Error>>, |
95 | { |
96 | let tmp_blob_addr = &TempBlob { namespace, uuid }; |
97 | while let Some(item) = stream.next().await { |
98 | let chunk = item?.to_vec(); |
99 | self.store() |
100 | .write(tmp_blob_addr, chunk.as_slice()) |
101 | .await |
102 | .map_err(Error::Storage)?; |
103 | } |
104 | Ok(()) |
105 | } |
106 | |
107 | pub async fn write_manifest( |
108 | &self, |
109 | namespace: &Namespace, |
110 | tag_or_digest: &TagOrDigest, |
111 | manifest: &ImageManifest, |
112 | manifest_bytes: &Bytes, |
113 | ) -> Result<(), Error> { |
114 | let uuid = Uuid::new_v4(); |
115 | let tmp_blob_addr = TempBlob { |
116 | uuid: &uuid, |
117 | namespace, |
118 | }; |
119 | self.storage |
120 | .inner() |
121 | .write_all(&tmp_blob_addr, manifest_bytes.to_vec().as_slice()) |
122 | .await |
123 | .map_err(Error::Storage)?; |
124 | // TODO: Generalize in storage.rs |
125 | let hashed = Sha256::digest(manifest_bytes); |
126 | let hash_str = base16ct::lower::encode_string(&hashed); |
127 | let digest = Digest::from_str(&format!("sha256:{}", hash_str)).unwrap(); |
128 | let blob_address = &Blob::from(&digest); |
129 | self.store() |
130 | .mv(&tmp_blob_addr, blob_address) |
131 | .await |
132 | .map_err(Error::Storage)?; |
133 | self.store() |
134 | .write_all( |
135 | &Reference { |
136 | namespace, |
137 | digest: &digest, |
138 | }, |
139 | digest.to_string().as_bytes(), |
140 | ) |
141 | .await |
142 | .map_err(Error::Storage)?; |
143 | |
144 | for layer in manifest.layers() { |
145 | // FIXME: iirc each blob needs to be resolved |
146 | let digest = layer.digest(); |
147 | let digest_str = digest.to_string(); |
148 | let digest_bytes = digest_str.as_bytes(); |
149 | let layer_addr = LayerLink { namespace, digest }; |
150 | self.storage |
151 | .inner() |
152 | .write_all(&layer_addr, digest_bytes) |
153 | .await |
154 | .map_err(Error::Storage)?; |
155 | } |
156 | let oci_config_digest = manifest.config().digest(); |
157 | let manifest_embedded_data = manifest.config().data(); |
158 | if let Some(manifest_embedded_data) = manifest_embedded_data { |
159 | // let manifest_str = manifest.to_string_pretty().unwrap(); |
160 | let decoded = BASE64_STANDARD.decode(manifest_embedded_data).unwrap(); |
161 | self.storage |
162 | .inner() |
163 | .write_all( |
164 | &Blob { |
165 | digest: oci_config_digest, |
166 | }, |
167 | decoded.as_slice(), |
168 | ) |
169 | .await |
170 | .map_err(Error::Storage)?; |
171 | self.storage |
172 | .inner() |
173 | .write_all( |
174 | &LayerLink { |
175 | namespace, |
176 | digest: oci_config_digest, |
177 | }, |
178 | decoded.as_slice(), |
179 | ) |
180 | .await |
181 | .map_err(Error::Storage)?; |
182 | } |
183 | |
184 | if let TagOrDigest::Tag(name) = tag_or_digest { |
185 | self.store() |
186 | .write_all(&Tag { namespace, name }, digest.to_string().as_bytes()) |
187 | .await |
188 | .map_err(Error::Storage)?; |
189 | } |
190 | |
191 | Ok(()) |
192 | } |
193 | |
194 | pub async fn read_manifest( |
195 | &self, |
196 | namespace: &Namespace, |
197 | tag_or_digest: &TagOrDigest, |
198 | ) -> Result<ImageManifest, Error> { |
199 | let blob_addr = match tag_or_digest { |
200 | TagOrDigest::Tag(name) => self.resolve_link(&Tag { namespace, name }).await?, |
201 | TagOrDigest::Digest(digest) => { |
202 | self.resolve_link(&Reference { namespace, digest }).await? |
203 | } |
204 | }; |
205 | let manifest_bytes = self |
206 | .storage |
207 | .inner() |
208 | .read_bytes(&blob_addr) |
209 | .await |
210 | .map_err(Error::Storage)?; |
211 | let manifest: ImageManifest = |
212 | serde_json::de::from_slice(manifest_bytes.iter().as_slice()).expect("manifest invalid"); |
213 | Ok(manifest) |
214 | } |
215 | |
216 | pub async fn has_blob(&self, digest: &Digest) -> Result<bool, Error> { |
217 | let blob_addr = Blob::from(digest); |
218 | self.store() |
219 | .exists(&blob_addr) |
220 | .await |
221 | .map_err(Error::Storage) |
222 | } |
223 | |
224 | pub async fn has_manifest( |
225 | &self, |
226 | namespace: &Namespace, |
227 | tag_or_digest: &TagOrDigest, |
228 | ) -> Result<bool, Error> { |
229 | let blob_addr = match tag_or_digest { |
230 | TagOrDigest::Tag(name) => self.resolve_link(&Tag { namespace, name }).await?, |
231 | TagOrDigest::Digest(digest) => { |
232 | self.resolve_link(&Reference { namespace, digest }).await? |
233 | } |
234 | }; |
235 | self.store() |
236 | .exists(&blob_addr) |
237 | .await |
238 | .map_err(Error::Storage) |
239 | } |
240 | |
241 | pub async fn read_blob(&self, digest: &Digest) -> Result<InnerStream, Error> { |
242 | let blob_addr = Blob::from(digest); |
243 | self.store().read(&blob_addr).await.map_err(Error::Storage) |
244 | } |
245 | } |