Author: Kevin Schoon [me@kevinschoon.com]
Hash: 5d9ab94e0fa4cab1138034cabda1ffe1c87b04fc
Timestamp: Sat, 17 Aug 2024 22:17:39 +0000 (2 months ago)

+157 -74 +/-6 browse
reduce allocations and copying
1diff --git a/cmd/maitred-debug/src/main.rs b/cmd/maitred-debug/src/main.rs
2index e4c3141..181581e 100644
3--- a/cmd/maitred-debug/src/main.rs
4+++ b/cmd/maitred-debug/src/main.rs
5 @@ -1,6 +1,22 @@
6- use maitred::{mail_parser::Message, Error, MilterFunc, Server, SessionOptions};
7+ use maitred::{
8+ mail_parser::Message, DeliveryError, DeliveryFunc, Error, MilterFunc, Server, SessionOptions,
9+ };
10 use tracing::Level;
11
12+ async fn print_message(message: Message<'static>) -> Result<(), DeliveryError> {
13+ println!("New SMTP Message:");
14+ println!("{:?}", message.headers());
15+ println!("Subject: {:?}", message.subject());
16+ println!(
17+ "{}",
18+ message
19+ .body_text(0)
20+ .map(|text| String::from_utf8_lossy(text.as_bytes()).to_string())
21+ .unwrap_or_default()
22+ );
23+ Ok(())
24+ }
25+
26 #[tokio::main]
27 async fn main() -> Result<(), Error> {
28 // Create a subscriber that logs events to the console
29 @@ -12,9 +28,11 @@ async fn main() -> Result<(), Error> {
30 // Set the subscriber as the default subscriber
31 let mut mail_server = Server::default()
32 .address("127.0.0.1:2525")
33- .with_milter(MilterFunc::new(|message: &Message| {
34- let cloned = message.clone();
35- Box::pin(async move { Ok(cloned.to_owned()) })
36+ .with_milter(MilterFunc::new(|message: Message<'static>| {
37+ Box::pin(async move { Ok(message.to_owned()) })
38+ }))
39+ .with_delivery(DeliveryFunc::new(|message: Message<'static>| {
40+ Box::pin(async move { print_message(message.to_owned()).await })
41 }))
42 .with_session_opts(SessionOptions::default());
43 mail_server.listen().await?;
44 diff --git a/maitred/src/delivery.rs b/maitred/src/delivery.rs
45new file mode 100644
46index 0000000..3a8b41e
47--- /dev/null
48+++ b/maitred/src/delivery.rs
49 @@ -0,0 +1,48 @@
50+ use std::result::Result as StdResult;
51+ use std::{future::Future, pin::Pin, sync::Arc};
52+
53+ use async_trait::async_trait;
54+ use mail_parser::Message;
55+
56+ /// Result type containing a new message with possible modifications or an
57+ /// error indicating it should not be processed.
58+ pub type Result = StdResult<(), Error>;
59+
60+ #[derive(Debug, thiserror::Error)]
61+ pub enum Error {
62+ /// Indicates an unspecified error that occurred during milting.
63+ #[error("Internal Server Error: {0}")]
64+ Server(String),
65+ }
66+
67+ /// Delivery is the final step processing mail and can implement
68+ #[async_trait]
69+ pub trait Delivery {
70+ /// Apply the milter function to the incoming message
71+ async fn deliver(&self, message: Message<'static>) -> Result;
72+ }
73+
74+ #[derive(Clone)]
75+ pub struct Func<F>(Arc<F>)
76+ where
77+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync;
78+
79+ impl<F> Func<F>
80+ where
81+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync,
82+ {
83+ pub fn new(func: F) -> Self {
84+ Func(Arc::new(func))
85+ }
86+ }
87+
88+ #[async_trait]
89+ impl<F> Delivery for Func<F>
90+ where
91+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync,
92+ {
93+ async fn deliver(&self, message: Message<'static>) -> Result {
94+ let f = (self.0)(message);
95+ f.await
96+ }
97+ }
98 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
99index 42a6a02..ab73ef2 100644
100--- a/maitred/src/lib.rs
101+++ b/maitred/src/lib.rs
102 @@ -2,9 +2,26 @@
103 //! within a Rust program.
104 //! # Example SMTP Server
105 //! ```rust
106- //! use maitred::{Error, Server};
107+ //! use maitred::{
108+ //! mail_parser::Message, DeliveryError, DeliveryFunc, Error,
109+ //! MilterFunc, Server, SessionOptions,
110+ //! };
111 //! use tracing::Level;
112 //!
113+ //! async fn print_message(message: Message<'static>) -> Result<(), DeliveryError> {
114+ //! println!("New SMTP Message:");
115+ //! println!("{:?}", message.headers());
116+ //! println!("Subject: {:?}", message.subject());
117+ //! println!(
118+ //! "{}",
119+ //! message
120+ //! .body_text(0)
121+ //! .map(|text| String::from_utf8_lossy(text.as_bytes()).to_string())
122+ //! .unwrap_or_default()
123+ //! );
124+ //! Ok(())
125+ //! }
126+ //!
127 //! #[tokio::main]
128 //! async fn main() -> Result<(), Error> {
129 //! // Create a subscriber that logs events to the console
130 @@ -13,14 +30,22 @@
131 //! .with_line_number(true)
132 //! .with_max_level(Level::DEBUG)
133 //! .init();
134- //!
135 //! // Set the subscriber as the default subscriber
136- //! let mail_server = Server::new("localhost").address("127.0.0.1:2525");
137- //! // mail_server.listen().await?;
138+ //! let mut mail_server = Server::default()
139+ //! .address("127.0.0.1:2525")
140+ //! .with_milter(MilterFunc::new(|message: Message<'static>| {
141+ //! Box::pin(async move { Ok(message.to_owned()) })
142+ //! }))
143+ //! .with_delivery(DeliveryFunc::new(|message: Message<'static>| {
144+ //! Box::pin(async move { print_message(message.to_owned()).await })
145+ //! }))
146+ //! .with_session_opts(SessionOptions::default());
147+ //! mail_server.listen().await?;
148 //! Ok(())
149 //! }
150 //! ```
151
152+ mod delivery;
153 mod error;
154 mod expand;
155 mod milter;
156 @@ -34,9 +59,11 @@ mod worker;
157 use smtp_proto::{Request, Response as SmtpResponse};
158 use transport::Response;
159
160- pub use error::Error;
161+ pub use delivery::{Delivery, Error as DeliveryError, Func as DeliveryFunc};
162 pub use expand::{Error as ExpansionError, Expansion, Func as ExpansionFunc};
163 pub use milter::{Error as MilterError, Func as MilterFunc, Milter};
164+
165+ pub use error::Error;
166 pub use pipeline::{Pipeline, Transaction};
167 pub use server::Server;
168 pub use session::{
169 diff --git a/maitred/src/milter.rs b/maitred/src/milter.rs
170index ef0ec99..a665628 100644
171--- a/maitred/src/milter.rs
172+++ b/maitred/src/milter.rs
173 @@ -10,7 +10,6 @@ use mail_parser::Message;
174 /// error indicating it should not be processed.
175 pub type Result = StdResult<Message<'static>, Error>;
176
177- /// An error encountered while expanding a mail address
178 #[derive(Debug, thiserror::Error)]
179 pub enum Error {
180 /// Indicates an unspecified error that occurred during milting.
181 @@ -18,13 +17,13 @@ pub enum Error {
182 Server(String),
183 }
184
185- /// A Milter (https://en.wikipedia.org/wiki/Milter) accepts an email message
186+ /// A [Milter](https://en.wikipedia.org/wiki/Milter) accepts an email message
187 /// and performs some permutation, modification, or rejection and then returns
188 /// the message.
189 #[async_trait]
190 pub trait Milter {
191 /// Apply the milter function to the incoming message
192- async fn apply(&self, message: &Message<'static>) -> Result;
193+ async fn apply(&self, message: Message<'static>) -> Result;
194 }
195
196 /// Wrapper implementing the Milter trait from a closure
197 @@ -33,7 +32,7 @@ pub trait Milter {
198 /// use mail_parser::Message;
199 /// use maitred::MilterFunc;
200 ///
201- /// let my_expn_fn = MilterFunc(|message: &Message| {
202+ /// let milter = MilterFunc::new(|message: Message<'static>| {
203 /// Box::pin(async move {
204 /// // rewrite message here
205 /// Ok(Message::default().to_owned())
206 @@ -43,17 +42,15 @@ pub trait Milter {
207 #[derive(Clone)]
208 pub struct Func<F>(Arc<F>)
209 where
210- F: Fn(&Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
211+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
212 + Send
213- + Sync
214- + 'static;
215+ + Sync;
216
217 impl<F> Func<F>
218 where
219- F: Fn(&Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
220+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
221 + Send
222 + Sync
223- + 'static,
224 {
225 pub fn new(func: F) -> Self {
226 Func(Arc::new(func))
227 @@ -63,12 +60,11 @@ where
228 #[async_trait]
229 impl<F> Milter for Func<F>
230 where
231- F: Fn(&Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
232+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
233 + Send
234 + Sync
235- + 'static,
236 {
237- async fn apply(&self, message: &Message<'static>) -> Result {
238+ async fn apply(&self, message: Message<'static>) -> Result {
239 let f = (self.0)(message);
240 f.await
241 }
242 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
243index 487c925..0b371b7 100644
244--- a/maitred/src/server.rs
245+++ b/maitred/src/server.rs
246 @@ -21,6 +21,7 @@ use crate::pipeline::Pipeline;
247 use crate::session::Session;
248 use crate::transport::{Response, Transport};
249 use crate::worker::{Packet, Worker};
250+ use crate::Delivery;
251 use crate::{Chunk, Milter};
252
253 /// The default port the server will listen on if none was specified in it's
254 @@ -61,48 +62,43 @@ impl ConditionalPipeline<'_> {
255 /// Server implements everything that is required to run an SMTP server by
256 /// binding to the configured address and processing individual TCP connections
257 /// as they are received.
258- pub struct Server<M>
259+ pub struct Server<D, M>
260 where
261- M: Milter + Clone + Send + Sync + 'static,
262+ D: Delivery + Clone + Send + Sync,
263+ M: Milter + Clone + Send + Sync,
264 {
265 address: String,
266- hostname: String,
267 global_timeout: Duration,
268 options: Option<Rc<crate::session::Options>>,
269- milters: Vec<Arc<M>>,
270+ milter: Option<Arc<M>>,
271+ delivery: Option<Arc<D>>,
272 n_threads: usize,
273 shutdown_handles: Vec<Sender<bool>>,
274 }
275
276- impl<M> Default for Server<M>
277+ impl<D, M> Default for Server<D, M>
278 where
279- M: Milter + Clone + Send + Sync + 'static,
280+ M: Milter + Clone + Send + Sync,
281+ D: Delivery + Clone + Send + Sync,
282 {
283 fn default() -> Self {
284- Server::<M> {
285+ Server::<D, M> {
286 address: DEFAULT_LISTEN_ADDR.to_string(),
287- hostname: String::default(),
288 global_timeout: Duration::from_secs(DEFAULT_GLOBAL_TIMEOUT_SECS),
289 options: None,
290- milters: vec![],
291+ milter: None,
292+ delivery: None,
293 n_threads: std::thread::available_parallelism().unwrap().into(),
294 shutdown_handles: vec![],
295 }
296 }
297 }
298
299- impl<M> Server<M>
300+ impl<D, M> Server<D, M>
301 where
302 M: Milter + Clone + Send + Sync + 'static,
303+ D: Delivery + Clone + Send + Sync + 'static,
304 {
305- /// Initialize a new SMTP server
306- pub fn new(hostname: &str) -> Self {
307- Server::<M> {
308- hostname: hostname.to_string(),
309- ..Default::default()
310- }
311- }
312-
313 /// Listener address for the SMTP server to bind to listen for incoming
314 /// connections.
315 pub fn address(mut self, address: &str) -> Self {
316 @@ -125,10 +121,15 @@ where
317 self
318 }
319
320- /// Append one or more milters which will be applied to messages after the
321- /// session has been closed but before they are enqued for delivery.
322+ /// Process each message with the provided milter before it is delivered
323 pub fn with_milter(mut self, milter: M) -> Self {
324- self.milters.push(Arc::new(milter));
325+ self.milter = Some(Arc::new(milter));
326+ self
327+ }
328+
329+ /// Delivery handles the delivery of the final message
330+ pub fn with_delivery(mut self, delivery: D) -> Self {
331+ self.delivery = Some(Arc::new(delivery));
332 self
333 }
334
335 @@ -201,10 +202,13 @@ where
336 self.shutdown_handles.push(tx);
337 let global_queue = global_queue.clone();
338 let stealers = stealers.clone();
339- let milters = self.milters.clone();
340+ let milter = self
341+ .milter
342+ .as_ref()
343+ .map_or(None, |milter| Some(milter.clone()));
344 tokio::task::spawn(async move {
345 let mut worker = Worker {
346- milters,
347+ milter,
348 global_queue,
349 stealers,
350 local_queue: Arc::new(Mutex::new(local_queue)),
351 @@ -268,7 +272,7 @@ where
352 #[cfg(test)]
353 mod test {
354
355- use crate::MilterFunc;
356+ use crate::{DeliveryFunc, MilterFunc};
357
358 use super::*;
359
360 @@ -327,7 +331,6 @@ mod test {
361 }
362 }
363
364- /*
365 #[tokio::test]
366 async fn test_server_process() {
367 let stream = FakeStream {
368 @@ -341,10 +344,13 @@ mod test {
369 ],
370 ..Default::default()
371 };
372- let server = Server::new("example.org").with_milter(MilterFunc(|message: &Message| {
373- println!("{:?}", message);
374- async move { Ok(Message::default().into_owned()) }
375- }));
376+ let server = Server::default()
377+ .with_milter(MilterFunc::new(|_: Message<'static>| {
378+ Box::pin(async move { Ok(Message::default().into_owned()) })
379+ }))
380+ .with_delivery(DeliveryFunc::new(|_: Message<'static>| {
381+ Box::pin(async move { Ok(()) })
382+ }));
383 let framed = Framed::new(stream, Transport::default());
384 let session = Session::default();
385 let greeting = session.greeting();
386 @@ -367,5 +373,4 @@ mod test {
387 .first()
388 .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com")));
389 }
390- */
391 }
392 diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs
393index a0fd512..86e5878 100644
394--- a/maitred/src/worker.rs
395+++ b/maitred/src/worker.rs
396 @@ -40,9 +40,9 @@ impl From<Session> for Packet {
397 /// SPF Verification
398 pub(crate) struct Worker<M>
399 where
400- M: Milter + Clone + Send + Sync + 'static,
401+ M: Milter + Clone + Send + Sync,
402 {
403- pub milters: Vec<Arc<M>>,
404+ pub milter: Option<Arc<M>>,
405 pub global_queue: Arc<Injector<Packet>>,
406 pub stealers: Vec<Stealer<Packet>>,
407 pub local_queue: Arc<Mutex<WorkQueue<Packet>>>,
408 @@ -78,27 +78,16 @@ where
409 }
410
411 if let Some(packet) = self.next_packet().await {
412- let message = packet.body.unwrap();
413- // apply all of the milters to the message returning the final
414- // result. Any failure prevents the modified message from being
415- // returned and will result in a rejection.
416- let modified: Result<Message<'static>, crate::milter::Error> =
417- stream::iter(self.milters.clone())
418- .fold(Ok(message), |accm, milter| async move {
419- if let Ok(message) = accm {
420- milter.apply(&message).await
421- } else {
422- accm
423- }
424- })
425- .await;
426-
427- match modified {
428- Ok(message) => {
429- tracing::info!("Message finished: {:?}", message);
430- }
431- Err(err) => {
432- tracing::warn!("Milter failed: {}", err)
433+ let mut message = packet.body.unwrap();
434+ if let Some(milter) = self.milter.as_ref() {
435+ match milter.apply(message).await {
436+ Ok(modified) => {
437+ tracing::info!("Milter finished successfully");
438+ message = modified;
439+ }
440+ Err(err) => {
441+ tracing::warn!("Milter failed to apply: {:?}", err);
442+ }
443 }
444 }
445 } else {