Commit
+52 -67 +/-3 browse
1 | diff --git a/maitred/src/delivery.rs b/maitred/src/delivery.rs |
2 | index 1da585c..56a4274 100644 |
3 | --- a/maitred/src/delivery.rs |
4 | +++ b/maitred/src/delivery.rs |
5 | @@ -1,12 +1,6 @@ |
6 | - use std::{ |
7 | - collections::BTreeMap, |
8 | - future::Future, |
9 | - io::Error as IoError, |
10 | - path::{Path, PathBuf}, |
11 | - }; |
12 | + use std::{collections::BTreeMap, future::Future, io::Error as IoError, path::Path}; |
13 | |
14 | use async_trait::async_trait; |
15 | - use mail_parser::Message; |
16 | use maildir::Maildir as MaildirInner; |
17 | |
18 | use crate::Envelope; |
19 | diff --git a/maitred/src/server.rs b/maitred/src/server.rs |
20 | index f7bc261..a3931ed 100644 |
21 | --- a/maitred/src/server.rs |
22 | +++ b/maitred/src/server.rs |
23 | @@ -1,5 +1,5 @@ |
24 | use std::fs::File as StdFile; |
25 | - use std::io::{BufReader as StdBufReader, Read, Write}; |
26 | + use std::io::BufReader as StdBufReader; |
27 | use std::net::SocketAddr; |
28 | use std::path::{Path, PathBuf}; |
29 | use std::sync::Arc; |
30 | @@ -13,26 +13,22 @@ use futures::SinkExt; |
31 | use futures::StreamExt; |
32 | use mail_auth::Resolver; |
33 | use mail_parser::Message; |
34 | - use rustls::ServerConnection; |
35 | use smtp_proto::Request; |
36 | - use tokio::io::BufReader; |
37 | use tokio::io::BufStream; |
38 | use tokio::net::TcpListener; |
39 | - use tokio::net::TcpStream; |
40 | use tokio::sync::mpsc::Sender; |
41 | use tokio::sync::Mutex; |
42 | use tokio::task::JoinHandle; |
43 | use tokio::time::timeout; |
44 | use tokio_rustls::{rustls, TlsAcceptor}; |
45 | use tokio_stream::{self as stream}; |
46 | - use tokio_util::codec::{Framed, LengthDelimitedCodec}; |
47 | + use tokio_util::codec::Framed; |
48 | use url::Host; |
49 | |
50 | use crate::delivery::Delivery; |
51 | use crate::error::Error; |
52 | use crate::milter::Milter; |
53 | use crate::session::{Session, SessionOptions}; |
54 | - use crate::smtp_response; |
55 | use crate::transport::{Command, Transport, TransportError}; |
56 | use crate::worker::Worker; |
57 | use crate::{Response, SmtpResponse}; |
58 | @@ -203,14 +199,17 @@ impl Server { |
59 | .with_single_cert(certs, private_key)?) |
60 | } |
61 | |
62 | - async fn serve_tls( |
63 | + async fn serve_tls<T>( |
64 | &self, |
65 | - stream: &mut BufStream<TcpStream>, |
66 | + stream: &mut BufStream<T>, |
67 | mut session: Session, |
68 | msg_queue: Arc<Injector<Envelope>>, |
69 | pipelining: bool, |
70 | send_greeting: bool, |
71 | - ) -> Result<(), ServerError> { |
72 | + ) -> Result<(), ServerError> |
73 | + where |
74 | + T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin, |
75 | + { |
76 | let acceptor = TlsAcceptor::from(Arc::new(self.rustls_config().await?)); |
77 | let tls_stream = acceptor.accept(stream).await?; |
78 | |
79 | @@ -292,12 +291,15 @@ impl Server { |
80 | Ok(()) |
81 | } |
82 | |
83 | - async fn serve_plain( |
84 | + async fn serve_plain<T>( |
85 | &self, |
86 | - stream: BufStream<TcpStream>, |
87 | + stream: BufStream<T>, |
88 | msg_queue: Arc<Injector<Envelope>>, |
89 | pipelining: bool, |
90 | - ) -> Result<(), ServerError> { |
91 | + ) -> Result<(), ServerError> |
92 | + where |
93 | + T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin, |
94 | + { |
95 | let mut session_opts = self.options.clone().unwrap_or_default(); |
96 | |
97 | if let Some(addr) = self.current_addr { |
98 | @@ -489,8 +491,6 @@ impl Server { |
99 | #[cfg(test)] |
100 | mod test { |
101 | |
102 | - /* |
103 | - |
104 | use crate::SessionOptions; |
105 | |
106 | use super::*; |
107 | @@ -548,6 +548,7 @@ mod test { |
108 | todo!() |
109 | } |
110 | } |
111 | + |
112 | #[tokio::test] |
113 | async fn test_server() { |
114 | let stream = FakeStream { |
115 | @@ -564,9 +565,11 @@ mod test { |
116 | let server = Server::default() |
117 | // turn off all extended capabilities |
118 | .with_session_opts(SessionOptions::default().capabilities(0)); |
119 | - let framed = Framed::new(stream, Transport::default()); |
120 | let global_queue = Arc::new(Injector::<Envelope>::new()); |
121 | - server.process(framed, global_queue.clone()).await.unwrap(); |
122 | + server |
123 | + .serve_plain(BufStream::new(stream), global_queue.clone(), false) |
124 | + .await |
125 | + .unwrap(); |
126 | let packet = global_queue.steal().success().unwrap(); |
127 | assert!(packet.mail_from.email() == "fuu@bar.com"); |
128 | assert!(packet |
129 | @@ -589,9 +592,11 @@ mod test { |
130 | ..Default::default() |
131 | }; |
132 | let server = Server::default(); |
133 | - let framed = Framed::new(stream, Transport::default()); |
134 | let global_queue = Arc::new(Injector::<Envelope>::new()); |
135 | - server.process(framed, global_queue.clone()).await.unwrap(); |
136 | + server |
137 | + .serve_plain(BufStream::new(stream), global_queue.clone(), false) |
138 | + .await |
139 | + .unwrap(); |
140 | let packet = global_queue.steal().success().unwrap(); |
141 | assert!(packet.mail_from.email() == "fuu@bar.com"); |
142 | assert!(packet |
143 | @@ -599,5 +604,4 @@ mod test { |
144 | .first() |
145 | .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com")); |
146 | } |
147 | - */ |
148 | } |
149 | diff --git a/maitred/src/session.rs b/maitred/src/session.rs |
150 | index 112721e..b1e3cdc 100644 |
151 | --- a/maitred/src/session.rs |
152 | +++ b/maitred/src/session.rs |
153 | @@ -736,11 +736,8 @@ mod test { |
154 | )), |
155 | }]; |
156 | let session = Mutex::new( |
157 | - Session::default().with_options( |
158 | - SessionOptions::default() |
159 | - .our_hostname(EXAMPLE_HOSTNAME) |
160 | - .into(), |
161 | - ), |
162 | + Session::default() |
163 | + .with_options(SessionOptions::default().our_hostname(EXAMPLE_HOSTNAME)), |
164 | ); |
165 | process_all(&session, requests).await; |
166 | } |
167 | @@ -778,22 +775,18 @@ mod test { |
168 | expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]), |
169 | }, |
170 | ]; |
171 | - let session = Mutex::new( |
172 | - Session::default().with_options( |
173 | - SessionOptions::default() |
174 | - .list_expansion(crate::expand::ExpansionFunc(|name: &str| { |
175 | - let name = name.to_string(); |
176 | - async move { |
177 | - assert!(name == "mailing-list"); |
178 | - Ok(vec![ |
179 | - EmailAddress::new_unchecked("Fuu <fuu@bar.com>"), |
180 | - EmailAddress::new_unchecked("Baz <baz@qux.com>"), |
181 | - ]) |
182 | - } |
183 | - })) |
184 | - .into(), |
185 | - ), |
186 | - ); |
187 | + let session = Mutex::new(Session::default().with_options( |
188 | + SessionOptions::default().list_expansion(crate::expand::ExpansionFunc(|name: &str| { |
189 | + let name = name.to_string(); |
190 | + async move { |
191 | + assert!(name == "mailing-list"); |
192 | + Ok(vec![ |
193 | + EmailAddress::new_unchecked("Fuu <fuu@bar.com>"), |
194 | + EmailAddress::new_unchecked("Baz <baz@qux.com>"), |
195 | + ]) |
196 | + } |
197 | + })), |
198 | + )); |
199 | process_all(&session, requests).await; |
200 | // session should contain both requests |
201 | let session = session.lock().await; |
202 | @@ -832,19 +825,17 @@ mod test { |
203 | expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]), |
204 | }, |
205 | ]; |
206 | - let session = Mutex::new( |
207 | - Session::default().with_options( |
208 | - SessionOptions::default() |
209 | - .verification(crate::verify::VerifyFunc(|addr: &EmailAddress| { |
210 | - let addr = addr.clone(); |
211 | - async move { |
212 | - assert!(addr.email() == "bar@baz.com"); |
213 | - Ok(()) |
214 | - } |
215 | - })) |
216 | - .into(), |
217 | - ), |
218 | - ); |
219 | + let session = Mutex::new(Session::default().with_options( |
220 | + SessionOptions::default().verification(crate::verify::VerifyFunc( |
221 | + |addr: &EmailAddress| { |
222 | + let addr = addr.clone(); |
223 | + async move { |
224 | + assert!(addr.email() == "bar@baz.com"); |
225 | + Ok(()) |
226 | + } |
227 | + }, |
228 | + )), |
229 | + )); |
230 | process_all(&session, requests).await; |
231 | // session should contain both requests |
232 | let session = session.lock().await; |
233 | @@ -934,8 +925,7 @@ mod test { |
234 | Session::default().with_options( |
235 | SessionOptions::default() |
236 | .our_hostname(EXAMPLE_HOSTNAME) |
237 | - .capabilities(DEFAULT_CAPABILITIES) |
238 | - .into(), |
239 | + .capabilities(DEFAULT_CAPABILITIES), |
240 | ), |
241 | ); |
242 | process_all(&session, requests).await; |
243 | @@ -999,11 +989,8 @@ transport rather than the session. |
244 | }, |
245 | ]; |
246 | let session = Mutex::new( |
247 | - Session::default().with_options( |
248 | - SessionOptions::default() |
249 | - .our_hostname(EXAMPLE_HOSTNAME) |
250 | - .into(), |
251 | - ), |
252 | + Session::default() |
253 | + .with_options(SessionOptions::default().our_hostname(EXAMPLE_HOSTNAME)), |
254 | ); |
255 | process_all(&session, requests).await; |
256 | let session = session.lock().await; |