src/storage_fs.rs
-rw-r--r-- 3.7 KiB
1use std::{
2 io::Cursor,
3 path::{Path, PathBuf},
4};
5
6use bytes::Bytes;
7use futures::StreamExt;
8use tokio::io::AsyncWriteExt;
9
10use crate::{
11 address::Addressable,
12 storage::{Error, InnerStream, StorageIface},
13};
14
15#[derive(Clone)]
16pub struct FileSystem {
17 pub base: PathBuf,
18}
19
20impl FileSystem {
21 pub fn init(&self) -> Result<(), Error> {
22 std::fs::create_dir_all(self.base.as_path())?;
23 Ok(())
24 }
25
26 pub async fn ensure_dir(&self, path: &Path) -> Result<(), Error> {
27 let parent = path.parent().unwrap();
28 tokio::fs::create_dir_all(parent).await?;
29 Ok(())
30 }
31}
32
33impl StorageIface for FileSystem {
34 async fn exists<A>(&self, addr: &A) -> Result<bool, Error>
35 where
36 A: Addressable + Send + Sync,
37 {
38 let path = addr.address().as_path(&self.base);
39 if tokio::fs::try_exists(&path)
40 .await
41 .is_ok_and(|exists| exists)
42 {
43 let md = tokio::fs::metadata(path).await?;
44 if md.is_file() { Ok(true) } else { Ok(false) }
45 } else {
46 Ok(false)
47 }
48 }
49
50 async fn write_all<A>(&self, addr: &A, bytes: &[u8]) -> Result<(), Error>
51 where
52 A: Addressable + Send + Sync,
53 {
54 let path = addr.address().as_path(&self.base);
55 self.ensure_dir(&path).await?;
56 let mut fp = tokio::fs::OpenOptions::new()
57 .create(true)
58 .truncate(true)
59 .write(true)
60 .open(&path)
61 .await?;
62 let mut cursor = Cursor::new(bytes);
63 fp.write_buf(&mut cursor).await?;
64 fp.flush().await?;
65 Ok(())
66 }
67
68 async fn write<A>(&self, addr: &A, bytes: &[u8]) -> Result<(), Error>
69 where
70 A: Addressable + Send + Sync,
71 {
72 let path = addr.address().as_path(&self.base);
73 let mut fp = tokio::fs::OpenOptions::new()
74 .create(false)
75 .truncate(false)
76 .write(true)
77 .open(path)
78 .await?;
79 let mut cursor = Cursor::new(bytes);
80 fp.write_buf(&mut cursor).await?;
81 fp.flush().await?;
82 Ok(())
83 }
84
85 async fn mv<A, B>(&self, src: &A, dst: &B) -> Result<(), Error>
86 where
87 A: Addressable + Send + Sync,
88 B: Addressable + Send + Sync,
89 {
90 let src_path = src.address().as_path(&self.base);
91 let dst_path = dst.address().as_path(&self.base);
92 self.ensure_dir(&dst_path).await?;
93 tracing::info!("mv {:?} -> {:?}", src_path, dst_path);
94 tokio::fs::rename(src_path, dst_path).await?;
95 Ok(())
96 }
97
98 async fn read<A>(&self, src: &A) -> Result<InnerStream, Error>
99 where
100 A: Addressable + Send + Sync,
101 {
102 let path = src.address().as_path(&self.base);
103 let fp = tokio::fs::File::open(path.as_path())
104 .await
105 .map_err(|e| match e.kind() {
106 std::io::ErrorKind::NotFound => Error::NotFound,
107 _ => Error::Io(e),
108 })?;
109 let stream = tokio_util::io::ReaderStream::new(fp);
110 Ok(InnerStream::new(stream.boxed()))
111 }
112
113 async fn read_bytes<A>(&self, src: &A) -> Result<Bytes, Error>
114 where
115 A: Addressable + Send + Sync,
116 {
117 let path = src.address().as_path(&self.base);
118 let payload = tokio::fs::read(path.as_path())
119 .await
120 .map_err(|e| match e.kind() {
121 std::io::ErrorKind::NotFound => Error::NotFound,
122 _ => Error::Io(e),
123 })?;
124 Ok(Bytes::from(payload))
125 }
126
127 async fn delete<A>(&self, src: &A) -> Result<(), Error>
128 where
129 A: Addressable + Send + Sync
130 {
131 let path = src.address().as_path(&self.base);
132 tokio::fs::remove_file(path.as_path()).await?;
133 Ok(())
134 }
135}