1 | use std::{ |
2 | io::Cursor, |
3 | path::{Path, PathBuf}, |
4 | }; |
5 | |
6 | use bytes::Bytes; |
7 | use futures::StreamExt; |
8 | use tokio::io::AsyncWriteExt; |
9 | |
10 | use crate::{ |
11 | address::Addressable, |
12 | storage::{Error, InnerStream, StorageIface}, |
13 | }; |
14 | |
15 | #[derive(Clone)] |
16 | pub struct FileSystem { |
17 | pub base: PathBuf, |
18 | } |
19 | |
20 | impl FileSystem { |
21 | pub fn init(&self) -> Result<(), Error> { |
22 | std::fs::create_dir_all(self.base.as_path())?; |
23 | Ok(()) |
24 | } |
25 | |
26 | pub async fn ensure_dir(&self, path: &Path) -> Result<(), Error> { |
27 | let parent = path.parent().unwrap(); |
28 | tokio::fs::create_dir_all(parent).await?; |
29 | Ok(()) |
30 | } |
31 | } |
32 | |
33 | impl StorageIface for FileSystem { |
34 | async fn exists<A>(&self, addr: &A) -> Result<bool, Error> |
35 | where |
36 | A: Addressable + Send + Sync, |
37 | { |
38 | let path = addr.address().as_path(&self.base); |
39 | if tokio::fs::try_exists(&path) |
40 | .await |
41 | .is_ok_and(|exists| exists) |
42 | { |
43 | let md = tokio::fs::metadata(path).await?; |
44 | if md.is_file() { Ok(true) } else { Ok(false) } |
45 | } else { |
46 | Ok(false) |
47 | } |
48 | } |
49 | |
50 | async fn write_all<A>(&self, addr: &A, bytes: &[u8]) -> Result<(), Error> |
51 | where |
52 | A: Addressable + Send + Sync, |
53 | { |
54 | let path = addr.address().as_path(&self.base); |
55 | self.ensure_dir(&path).await?; |
56 | let mut fp = tokio::fs::OpenOptions::new() |
57 | .create(true) |
58 | .truncate(true) |
59 | .write(true) |
60 | .open(&path) |
61 | .await?; |
62 | let mut cursor = Cursor::new(bytes); |
63 | fp.write_buf(&mut cursor).await?; |
64 | fp.flush().await?; |
65 | Ok(()) |
66 | } |
67 | |
68 | async fn write<A>(&self, addr: &A, bytes: &[u8]) -> Result<(), Error> |
69 | where |
70 | A: Addressable + Send + Sync, |
71 | { |
72 | let path = addr.address().as_path(&self.base); |
73 | let mut fp = tokio::fs::OpenOptions::new() |
74 | .create(false) |
75 | .truncate(false) |
76 | .write(true) |
77 | .open(path) |
78 | .await?; |
79 | let mut cursor = Cursor::new(bytes); |
80 | fp.write_buf(&mut cursor).await?; |
81 | fp.flush().await?; |
82 | Ok(()) |
83 | } |
84 | |
85 | async fn mv<A, B>(&self, src: &A, dst: &B) -> Result<(), Error> |
86 | where |
87 | A: Addressable + Send + Sync, |
88 | B: Addressable + Send + Sync, |
89 | { |
90 | let src_path = src.address().as_path(&self.base); |
91 | let dst_path = dst.address().as_path(&self.base); |
92 | self.ensure_dir(&dst_path).await?; |
93 | tracing::info!("mv {:?} -> {:?}", src_path, dst_path); |
94 | tokio::fs::rename(src_path, dst_path).await?; |
95 | Ok(()) |
96 | } |
97 | |
98 | async fn read<A>(&self, src: &A) -> Result<InnerStream, Error> |
99 | where |
100 | A: Addressable + Send + Sync, |
101 | { |
102 | let path = src.address().as_path(&self.base); |
103 | let fp = tokio::fs::File::open(path.as_path()) |
104 | .await |
105 | .map_err(|e| match e.kind() { |
106 | std::io::ErrorKind::NotFound => Error::NotFound, |
107 | _ => Error::Io(e), |
108 | })?; |
109 | let stream = tokio_util::io::ReaderStream::new(fp); |
110 | Ok(InnerStream::new(stream.boxed())) |
111 | } |
112 | |
113 | async fn read_bytes<A>(&self, src: &A) -> Result<Bytes, Error> |
114 | where |
115 | A: Addressable + Send + Sync, |
116 | { |
117 | let path = src.address().as_path(&self.base); |
118 | let payload = tokio::fs::read(path.as_path()) |
119 | .await |
120 | .map_err(|e| match e.kind() { |
121 | std::io::ErrorKind::NotFound => Error::NotFound, |
122 | _ => Error::Io(e), |
123 | })?; |
124 | Ok(Bytes::from(payload)) |
125 | } |
126 | |
127 | async fn delete<A>(&self, src: &A) -> Result<(), Error> |
128 | where |
129 | A: Addressable + Send + Sync |
130 | { |
131 | let path = src.address().as_path(&self.base); |
132 | tokio::fs::remove_file(path.as_path()).await?; |
133 | Ok(()) |
134 | } |
135 | } |