use std::{pin::Pin, str::FromStr}; use bytes::Bytes; use futures::{Stream, StreamExt}; use oci_spec::{ distribution::TagList, image::{Digest, ImageManifest}, }; use sha2::{Digest as HashDigest, Sha256}; use uuid::Uuid; use crate::{ Namespace, TagOrDigest, address::{Address, Addressable, Blob, LayerLink, Reference, Tag, TempBlob}, error::Error, storage::{InnerStream, Storage, StorageIface}, }; use base64::prelude::*; pub mod paths { pub const REPOSITORIES: &str = "/repositories"; } #[derive(Clone)] pub struct OciInterface { pub storage: Storage, } impl OciInterface { fn store(&self) -> impl StorageIface { self.storage.inner() } async fn resolve_link(&self, link: &A) -> Result where A: Addressable + Send + Sync, { let digest_bytes = self .store() .read_bytes(link) .await .map_err(Error::Storage)? .to_vec(); let digest_str = String::from_utf8_lossy(digest_bytes.as_slice()); let digest = Digest::from_str(&digest_str)?; let blob_addr = Blob::from(&digest); Ok(blob_addr.address()) } pub async fn new_blob(&self, namespace: &Namespace) -> Result { let uuid = Uuid::new_v4(); self.store() .write_all( &TempBlob { uuid: &uuid, namespace, }, &[], ) .await .map_err(Error::Storage)?; Ok(uuid) } pub async fn commit_blob( &self, namespace: &Namespace, uuid: &Uuid, digest: &Digest, ) -> Result<(), Error> { self.store() .mv(&TempBlob { uuid, namespace }, &Blob::from(digest)) .await .map_err(Error::Storage)?; Ok(()) } pub async fn delete_blob(&self, namespace: &Namespace, digest: &Digest) -> Result<(), Error> { self.store() .delete(&LayerLink { namespace, digest }) .await .map_err(Error::Storage)?; Ok(()) } pub async fn write_chunk( &self, mut stream: Pin>, namespace: &Namespace, uuid: &Uuid, ) -> Result<(), Error> where S: Stream>, { let tmp_blob_addr = &TempBlob { namespace, uuid }; while let Some(item) = stream.next().await { let chunk = item?.to_vec(); self.store() .write(tmp_blob_addr, chunk.as_slice()) .await .map_err(Error::Storage)?; } Ok(()) } pub async fn write_manifest( &self, namespace: &Namespace, tag_or_digest: &TagOrDigest, manifest: &ImageManifest, manifest_bytes: &Bytes, ) -> Result<(), Error> { let uuid = Uuid::new_v4(); let tmp_blob_addr = TempBlob { uuid: &uuid, namespace, }; self.storage .inner() .write_all(&tmp_blob_addr, manifest_bytes.to_vec().as_slice()) .await .map_err(Error::Storage)?; // TODO: Generalize in storage.rs let hashed = Sha256::digest(manifest_bytes); let hash_str = base16ct::lower::encode_string(&hashed); let digest = Digest::from_str(&format!("sha256:{}", hash_str)).unwrap(); let blob_address = &Blob::from(&digest); self.store() .mv(&tmp_blob_addr, blob_address) .await .map_err(Error::Storage)?; self.store() .write_all( &Reference { namespace, digest: &digest, }, digest.to_string().as_bytes(), ) .await .map_err(Error::Storage)?; for layer in manifest.layers() { // FIXME: iirc each blob needs to be resolved let digest = layer.digest(); let digest_str = digest.to_string(); let digest_bytes = digest_str.as_bytes(); let layer_addr = LayerLink { namespace, digest }; self.storage .inner() .write_all(&layer_addr, digest_bytes) .await .map_err(Error::Storage)?; } let oci_config_digest = manifest.config().digest(); let manifest_embedded_data = manifest.config().data(); if let Some(manifest_embedded_data) = manifest_embedded_data { // let manifest_str = manifest.to_string_pretty().unwrap(); let decoded = BASE64_STANDARD.decode(manifest_embedded_data).unwrap(); self.storage .inner() .write_all( &Blob { digest: oci_config_digest, }, decoded.as_slice(), ) .await .map_err(Error::Storage)?; self.storage .inner() .write_all( &LayerLink { namespace, digest: oci_config_digest, }, decoded.as_slice(), ) .await .map_err(Error::Storage)?; } if let TagOrDigest::Tag(name) = tag_or_digest { self.store() .write_all(&Tag { namespace, name }, digest.to_string().as_bytes()) .await .map_err(Error::Storage)?; } Ok(()) } pub async fn read_manifest( &self, namespace: &Namespace, tag_or_digest: &TagOrDigest, ) -> Result { let blob_addr = match tag_or_digest { TagOrDigest::Tag(name) => self.resolve_link(&Tag { namespace, name }).await?, TagOrDigest::Digest(digest) => { self.resolve_link(&Reference { namespace, digest }).await? } }; let manifest_bytes = self .storage .inner() .read_bytes(&blob_addr) .await .map_err(Error::Storage)?; let manifest: ImageManifest = serde_json::de::from_slice(manifest_bytes.iter().as_slice()).expect("manifest invalid"); Ok(manifest) } pub async fn has_blob(&self, digest: &Digest) -> Result { let blob_addr = Blob::from(digest); self.store() .exists(&blob_addr) .await .map_err(Error::Storage) } pub async fn has_manifest( &self, namespace: &Namespace, tag_or_digest: &TagOrDigest, ) -> Result { let blob_addr = match tag_or_digest { TagOrDigest::Tag(name) => self.resolve_link(&Tag { namespace, name }).await?, TagOrDigest::Digest(digest) => { self.resolve_link(&Reference { namespace, digest }).await? } }; self.store() .exists(&blob_addr) .await .map_err(Error::Storage) } pub async fn read_blob(&self, digest: &Digest) -> Result { let blob_addr = Blob::from(digest); self.store().read(&blob_addr).await.map_err(Error::Storage) } }