use std::{ io::Cursor, path::{Path, PathBuf}, }; use bytes::Bytes; use futures::StreamExt; use tokio::io::AsyncWriteExt; use crate::{ address::Addressable, storage::{Error, InnerStream, StorageIface}, }; #[derive(Clone)] pub struct FileSystem { pub base: PathBuf, } impl FileSystem { pub fn init(&self) -> Result<(), Error> { std::fs::create_dir_all(self.base.as_path())?; Ok(()) } pub async fn ensure_dir(&self, path: &Path) -> Result<(), Error> { let parent = path.parent().unwrap(); tokio::fs::create_dir_all(parent).await?; Ok(()) } } impl StorageIface for FileSystem { async fn exists(&self, addr: &A) -> Result where A: Addressable + Send + Sync, { let path = addr.address().as_path(&self.base); if tokio::fs::try_exists(&path) .await .is_ok_and(|exists| exists) { let md = tokio::fs::metadata(path).await?; if md.is_file() { Ok(true) } else { Ok(false) } } else { Ok(false) } } async fn write_all(&self, addr: &A, bytes: &[u8]) -> Result<(), Error> where A: Addressable + Send + Sync, { let path = addr.address().as_path(&self.base); self.ensure_dir(&path).await?; let mut fp = tokio::fs::OpenOptions::new() .create(true) .truncate(true) .write(true) .open(&path) .await?; let mut cursor = Cursor::new(bytes); fp.write_buf(&mut cursor).await?; fp.flush().await?; Ok(()) } async fn write(&self, addr: &A, bytes: &[u8]) -> Result<(), Error> where A: Addressable + Send + Sync, { let path = addr.address().as_path(&self.base); let mut fp = tokio::fs::OpenOptions::new() .create(false) .truncate(false) .write(true) .open(path) .await?; let mut cursor = Cursor::new(bytes); fp.write_buf(&mut cursor).await?; fp.flush().await?; Ok(()) } async fn mv(&self, src: &A, dst: &B) -> Result<(), Error> where A: Addressable + Send + Sync, B: Addressable + Send + Sync, { let src_path = src.address().as_path(&self.base); let dst_path = dst.address().as_path(&self.base); self.ensure_dir(&dst_path).await?; tracing::info!("mv {:?} -> {:?}", src_path, dst_path); tokio::fs::rename(src_path, dst_path).await?; Ok(()) } async fn read(&self, src: &A) -> Result where A: Addressable + Send + Sync, { let path = src.address().as_path(&self.base); let fp = tokio::fs::File::open(path.as_path()) .await .map_err(|e| match e.kind() { std::io::ErrorKind::NotFound => Error::NotFound, _ => Error::Io(e), })?; let stream = tokio_util::io::ReaderStream::new(fp); Ok(InnerStream::new(stream.boxed())) } async fn read_bytes(&self, src: &A) -> Result where A: Addressable + Send + Sync, { let path = src.address().as_path(&self.base); let payload = tokio::fs::read(path.as_path()) .await .map_err(|e| match e.kind() { std::io::ErrorKind::NotFound => Error::NotFound, _ => Error::Io(e), })?; Ok(Bytes::from(payload)) } async fn delete(&self, src: &A) -> Result<(), Error> where A: Addressable + Send + Sync { let path = src.address().as_path(&self.base); tokio::fs::remove_file(path.as_path()).await?; Ok(()) } }