src/oci_interface.rs
-rw-r--r-- 7.3 KiB
1use std::{pin::Pin, str::FromStr};
2
3use bytes::Bytes;
4use futures::{Stream, StreamExt};
5use oci_spec::{
6 distribution::TagList,
7 image::{Digest, ImageManifest},
8};
9use sha2::{Digest as HashDigest, Sha256};
10use uuid::Uuid;
11
12use crate::{
13 Namespace, TagOrDigest,
14 address::{Address, Addressable, Blob, LayerLink, Reference, Tag, TempBlob},
15 error::Error,
16 storage::{InnerStream, Storage, StorageIface},
17};
18
19use base64::prelude::*;
20
21pub mod paths {
22 pub const REPOSITORIES: &str = "/repositories";
23}
24
25#[derive(Clone)]
26pub struct OciInterface {
27 pub storage: Storage,
28}
29
30impl OciInterface {
31 fn store(&self) -> impl StorageIface {
32 self.storage.inner()
33 }
34
35 async fn resolve_link<A>(&self, link: &A) -> Result<Address, Error>
36 where
37 A: Addressable + Send + Sync,
38 {
39 let digest_bytes = self
40 .store()
41 .read_bytes(link)
42 .await
43 .map_err(Error::Storage)?
44 .to_vec();
45 let digest_str = String::from_utf8_lossy(digest_bytes.as_slice());
46 let digest = Digest::from_str(&digest_str)?;
47 let blob_addr = Blob::from(&digest);
48 Ok(blob_addr.address())
49 }
50
51 pub async fn new_blob(&self, namespace: &Namespace) -> Result<Uuid, Error> {
52 let uuid = Uuid::new_v4();
53 self.store()
54 .write_all(
55 &TempBlob {
56 uuid: &uuid,
57 namespace,
58 },
59 &[],
60 )
61 .await
62 .map_err(Error::Storage)?;
63 Ok(uuid)
64 }
65
66 pub async fn commit_blob(
67 &self,
68 namespace: &Namespace,
69 uuid: &Uuid,
70 digest: &Digest,
71 ) -> Result<(), Error> {
72 self.store()
73 .mv(&TempBlob { uuid, namespace }, &Blob::from(digest))
74 .await
75 .map_err(Error::Storage)?;
76 Ok(())
77 }
78
79 pub async fn delete_blob(&self, namespace: &Namespace, digest: &Digest) -> Result<(), Error> {
80 self.store()
81 .delete(&LayerLink { namespace, digest })
82 .await
83 .map_err(Error::Storage)?;
84 Ok(())
85 }
86
87 pub async fn write_chunk<S>(
88 &self,
89 mut stream: Pin<Box<S>>,
90 namespace: &Namespace,
91 uuid: &Uuid,
92 ) -> Result<(), Error>
93 where
94 S: Stream<Item = Result<Bytes, Error>>,
95 {
96 let tmp_blob_addr = &TempBlob { namespace, uuid };
97 while let Some(item) = stream.next().await {
98 let chunk = item?.to_vec();
99 self.store()
100 .write(tmp_blob_addr, chunk.as_slice())
101 .await
102 .map_err(Error::Storage)?;
103 }
104 Ok(())
105 }
106
107 pub async fn write_manifest(
108 &self,
109 namespace: &Namespace,
110 tag_or_digest: &TagOrDigest,
111 manifest: &ImageManifest,
112 manifest_bytes: &Bytes,
113 ) -> Result<(), Error> {
114 let uuid = Uuid::new_v4();
115 let tmp_blob_addr = TempBlob {
116 uuid: &uuid,
117 namespace,
118 };
119 self.storage
120 .inner()
121 .write_all(&tmp_blob_addr, manifest_bytes.to_vec().as_slice())
122 .await
123 .map_err(Error::Storage)?;
124 // TODO: Generalize in storage.rs
125 let hashed = Sha256::digest(manifest_bytes);
126 let hash_str = base16ct::lower::encode_string(&hashed);
127 let digest = Digest::from_str(&format!("sha256:{}", hash_str)).unwrap();
128 let blob_address = &Blob::from(&digest);
129 self.store()
130 .mv(&tmp_blob_addr, blob_address)
131 .await
132 .map_err(Error::Storage)?;
133 self.store()
134 .write_all(
135 &Reference {
136 namespace,
137 digest: &digest,
138 },
139 digest.to_string().as_bytes(),
140 )
141 .await
142 .map_err(Error::Storage)?;
143
144 for layer in manifest.layers() {
145 // FIXME: iirc each blob needs to be resolved
146 let digest = layer.digest();
147 let digest_str = digest.to_string();
148 let digest_bytes = digest_str.as_bytes();
149 let layer_addr = LayerLink { namespace, digest };
150 self.storage
151 .inner()
152 .write_all(&layer_addr, digest_bytes)
153 .await
154 .map_err(Error::Storage)?;
155 }
156 let oci_config_digest = manifest.config().digest();
157 let manifest_embedded_data = manifest.config().data();
158 if let Some(manifest_embedded_data) = manifest_embedded_data {
159 // let manifest_str = manifest.to_string_pretty().unwrap();
160 let decoded = BASE64_STANDARD.decode(manifest_embedded_data).unwrap();
161 self.storage
162 .inner()
163 .write_all(
164 &Blob {
165 digest: oci_config_digest,
166 },
167 decoded.as_slice(),
168 )
169 .await
170 .map_err(Error::Storage)?;
171 self.storage
172 .inner()
173 .write_all(
174 &LayerLink {
175 namespace,
176 digest: oci_config_digest,
177 },
178 decoded.as_slice(),
179 )
180 .await
181 .map_err(Error::Storage)?;
182 }
183
184 if let TagOrDigest::Tag(name) = tag_or_digest {
185 self.store()
186 .write_all(&Tag { namespace, name }, digest.to_string().as_bytes())
187 .await
188 .map_err(Error::Storage)?;
189 }
190
191 Ok(())
192 }
193
194 pub async fn read_manifest(
195 &self,
196 namespace: &Namespace,
197 tag_or_digest: &TagOrDigest,
198 ) -> Result<ImageManifest, Error> {
199 let blob_addr = match tag_or_digest {
200 TagOrDigest::Tag(name) => self.resolve_link(&Tag { namespace, name }).await?,
201 TagOrDigest::Digest(digest) => {
202 self.resolve_link(&Reference { namespace, digest }).await?
203 }
204 };
205 let manifest_bytes = self
206 .storage
207 .inner()
208 .read_bytes(&blob_addr)
209 .await
210 .map_err(Error::Storage)?;
211 let manifest: ImageManifest =
212 serde_json::de::from_slice(manifest_bytes.iter().as_slice()).expect("manifest invalid");
213 Ok(manifest)
214 }
215
216 pub async fn has_blob(&self, digest: &Digest) -> Result<bool, Error> {
217 let blob_addr = Blob::from(digest);
218 self.store()
219 .exists(&blob_addr)
220 .await
221 .map_err(Error::Storage)
222 }
223
224 pub async fn has_manifest(
225 &self,
226 namespace: &Namespace,
227 tag_or_digest: &TagOrDigest,
228 ) -> Result<bool, Error> {
229 let blob_addr = match tag_or_digest {
230 TagOrDigest::Tag(name) => self.resolve_link(&Tag { namespace, name }).await?,
231 TagOrDigest::Digest(digest) => {
232 self.resolve_link(&Reference { namespace, digest }).await?
233 }
234 };
235 self.store()
236 .exists(&blob_addr)
237 .await
238 .map_err(Error::Storage)
239 }
240
241 pub async fn read_blob(&self, digest: &Digest) -> Result<InnerStream, Error> {
242 let blob_addr = Blob::from(digest);
243 self.store().read(&blob_addr).await.map_err(Error::Storage)
244 }
245}