Author: Kevin Schoon [me@kevinschoon.com]
Hash: 09a902eb02cee2ea5df8a088f5dcdd79a3982053
Timestamp: Mon, 19 Aug 2024 23:04:38 +0000 (1 month ago)

+109 -104 +/-6 browse
drop traits in favor of hkts for milter and delivery
1diff --git a/cmd/maitred-debug/src/main.rs b/cmd/maitred-debug/src/main.rs
2index 181581e..f859a08 100644
3--- a/cmd/maitred-debug/src/main.rs
4+++ b/cmd/maitred-debug/src/main.rs
5 @@ -1,5 +1,5 @@
6 use maitred::{
7- mail_parser::Message, DeliveryError, DeliveryFunc, Error, MilterFunc, Server, SessionOptions,
8+ mail_parser::Message, Delivery, DeliveryError, Error, Milter, Server, SessionOptions,
9 };
10 use tracing::Level;
11
12 @@ -28,10 +28,10 @@ async fn main() -> Result<(), Error> {
13 // Set the subscriber as the default subscriber
14 let mut mail_server = Server::default()
15 .address("127.0.0.1:2525")
16- .with_milter(MilterFunc::new(|message: Message<'static>| {
17+ .with_milter(Milter::new(|message: Message<'static>| {
18 Box::pin(async move { Ok(message.to_owned()) })
19 }))
20- .with_delivery(DeliveryFunc::new(|message: Message<'static>| {
21+ .with_delivery(Delivery::new(|message: Message<'static>| {
22 Box::pin(async move { print_message(message.to_owned()).await })
23 }))
24 .with_session_opts(SessionOptions::default());
25 diff --git a/maitred/src/delivery.rs b/maitred/src/delivery.rs
26index 3a8b41e..ea52344 100644
27--- a/maitred/src/delivery.rs
28+++ b/maitred/src/delivery.rs
29 @@ -1,7 +1,6 @@
30 use std::result::Result as StdResult;
31 use std::{future::Future, pin::Pin, sync::Arc};
32
33- use async_trait::async_trait;
34 use mail_parser::Message;
35
36 /// Result type containing a new message with possible modifications or an
37 @@ -15,34 +14,18 @@ pub enum Error {
38 Server(String),
39 }
40
41- /// Delivery is the final step processing mail and can implement
42- #[async_trait]
43- pub trait Delivery {
44- /// Apply the milter function to the incoming message
45- async fn deliver(&self, message: Message<'static>) -> Result;
46- }
47-
48+ /// Delivery is the final stage of accepting an e-mail and may be invoked
49+ /// multiple times depending on the server configuration.
50 #[derive(Clone)]
51- pub struct Func<F>(Arc<F>)
52+ pub struct Delivery<F>(pub Arc<F>)
53 where
54- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync;
55+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Sync + 'static;
56
57- impl<F> Func<F>
58+ impl<F> Delivery<F>
59 where
60- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync,
61+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Sync + 'static,
62 {
63 pub fn new(func: F) -> Self {
64- Func(Arc::new(func))
65- }
66- }
67-
68- #[async_trait]
69- impl<F> Delivery for Func<F>
70- where
71- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync,
72- {
73- async fn deliver(&self, message: Message<'static>) -> Result {
74- let f = (self.0)(message);
75- f.await
76+ Delivery(Arc::new(func))
77 }
78 }
79 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
80index 80f9951..c8b9614 100644
81--- a/maitred/src/lib.rs
82+++ b/maitred/src/lib.rs
83 @@ -3,8 +3,8 @@
84 //! # Example SMTP Server
85 //! ```rust
86 //! use maitred::{
87- //! mail_parser::Message, DeliveryError, DeliveryFunc, Error,
88- //! MilterFunc, Server, SessionOptions,
89+ //! mail_parser::Message, DeliveryError, Delivery, Error,
90+ //! Milter, Server, SessionOptions,
91 //! };
92 //! use tracing::Level;
93 //!
94 @@ -33,10 +33,10 @@
95 //! // Set the subscriber as the default subscriber
96 //! let mut mail_server = Server::default()
97 //! .address("127.0.0.1:2525")
98- //! .with_milter(MilterFunc::new(|message: Message<'static>| {
99+ //! .with_milter(Milter::new(|message: Message<'static>| {
100 //! Box::pin(async move { Ok(message.to_owned()) })
101 //! }))
102- //! .with_delivery(DeliveryFunc::new(|message: Message<'static>| {
103+ //! .with_delivery(Delivery::new(|message: Message<'static>| {
104 //! Box::pin(async move { print_message(message.to_owned()).await })
105 //! }))
106 //! .with_session_opts(SessionOptions::default());
107 @@ -59,9 +59,9 @@ mod worker;
108 use smtp_proto::{Request, Response as SmtpResponse};
109 use transport::Response;
110
111- pub use delivery::{Delivery, Error as DeliveryError, Func as DeliveryFunc};
112+ pub use delivery::{Delivery, Error as DeliveryError};
113 pub use expand::{Error as ExpansionError, Expansion, Func as ExpansionFunc};
114- pub use milter::{Error as MilterError, Func as MilterFunc, Milter};
115+ pub use milter::{Error as MilterError, Milter};
116
117 pub use error::Error;
118 pub use pipeline::{Pipeline, Transaction};
119 diff --git a/maitred/src/milter.rs b/maitred/src/milter.rs
120index a665628..0ce1521 100644
121--- a/maitred/src/milter.rs
122+++ b/maitred/src/milter.rs
123 @@ -3,7 +3,6 @@ use std::pin::Pin;
124 use std::result::Result as StdResult;
125 use std::sync::Arc;
126
127- use async_trait::async_trait;
128 use mail_parser::Message;
129
130 /// Result type containing a new message with possible modifications or an
131 @@ -20,19 +19,11 @@ pub enum Error {
132 /// A [Milter](https://en.wikipedia.org/wiki/Milter) accepts an email message
133 /// and performs some permutation, modification, or rejection and then returns
134 /// the message.
135- #[async_trait]
136- pub trait Milter {
137- /// Apply the milter function to the incoming message
138- async fn apply(&self, message: Message<'static>) -> Result;
139- }
140-
141- /// Wrapper implementing the Milter trait from a closure
142- /// # Example
143 /// ```rust
144 /// use mail_parser::Message;
145- /// use maitred::MilterFunc;
146+ /// use maitred::Milter;
147 ///
148- /// let milter = MilterFunc::new(|message: Message<'static>| {
149+ /// let milter = Milter::new(|message: Message<'static>| {
150 /// Box::pin(async move {
151 /// // rewrite message here
152 /// Ok(Message::default().to_owned())
153 @@ -40,32 +31,15 @@ pub trait Milter {
154 /// });
155 /// ```
156 #[derive(Clone)]
157- pub struct Func<F>(Arc<F>)
158+ pub struct Milter<F>(pub Arc<F>)
159 where
160- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
161- + Send
162- + Sync;
163+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync;
164
165- impl<F> Func<F>
166+ impl<F> Milter<F>
167 where
168- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
169- + Send
170- + Sync
171+ F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync,
172 {
173 pub fn new(func: F) -> Self {
174- Func(Arc::new(func))
175- }
176- }
177-
178- #[async_trait]
179- impl<F> Milter for Func<F>
180- where
181- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>>
182- + Send
183- + Sync
184- {
185- async fn apply(&self, message: Message<'static>) -> Result {
186- let f = (self.0)(message);
187- f.await
188+ Milter(Arc::new(func))
189 }
190 }
191 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
192index f99f9dd..50ef55f 100644
193--- a/maitred/src/server.rs
194+++ b/maitred/src/server.rs
195 @@ -1,3 +1,5 @@
196+ use std::future::Future;
197+ use std::pin::Pin;
198 use std::rc::Rc;
199 use std::sync::Arc;
200 use std::time::Duration;
201 @@ -8,6 +10,7 @@ use crossbeam_deque::Stealer;
202 use crossbeam_deque::Worker as WorkQueue;
203 use futures::SinkExt;
204 use futures::StreamExt;
205+ use mail_parser::Message;
206 use smtp_proto::Request;
207 use tokio::sync::mpsc::Sender;
208 use tokio::sync::Mutex;
209 @@ -21,8 +24,7 @@ use crate::pipeline::Pipeline;
210 use crate::session::Session;
211 use crate::transport::{Response, Transport};
212 use crate::worker::{Packet, Worker};
213- use crate::Delivery;
214- use crate::{Chunk, Milter};
215+ use crate::{Chunk, Delivery, Milter};
216
217 /// The default port the server will listen on if none was specified in it's
218 /// configuration options.
219 @@ -64,30 +66,46 @@ impl ConditionalPipeline<'_> {
220 /// as they are received.
221 pub struct Server<D, M>
222 where
223- D: Delivery + Clone + Send + Sync,
224- M: Milter + Clone + Send + Sync,
225+ D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
226+ + Clone
227+ + Send
228+ + Sync
229+ + 'static,
230+ M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
231+ + Send
232+ + Sync
233+ + 'static,
234 {
235 address: String,
236 global_timeout: Duration,
237 options: Option<Rc<crate::session::Options>>,
238- milter: Option<M>,
239- delivery: Option<D>,
240+ milter: Option<Milter<M>>,
241+ delivery: Option<Delivery<D>>,
242 n_threads: usize,
243 shutdown_handles: Vec<Sender<bool>>,
244 }
245
246 impl<D, M> Default for Server<D, M>
247 where
248- M: Milter + Clone + Send + Sync,
249- D: Delivery + Clone + Send + Sync,
250+ D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
251+ + Clone
252+ + Send
253+ + Sync
254+ + 'static,
255+
256+ M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
257+ + Clone
258+ + Send
259+ + Sync
260+ + 'static,
261 {
262 fn default() -> Self {
263 Server::<D, M> {
264 address: DEFAULT_LISTEN_ADDR.to_string(),
265 global_timeout: Duration::from_secs(DEFAULT_GLOBAL_TIMEOUT_SECS),
266 options: None,
267- milter: None,
268- delivery: None,
269+ milter: None::<Milter<M>>,
270+ delivery: None::<Delivery<D>>,
271 n_threads: std::thread::available_parallelism().unwrap().into(),
272 shutdown_handles: vec![],
273 }
274 @@ -96,8 +114,16 @@ where
275
276 impl<D, M> Server<D, M>
277 where
278- M: Milter + Clone + Send + Sync + 'static,
279- D: Delivery + Clone + Send + Sync + 'static,
280+ D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
281+ + Clone
282+ + Send
283+ + Sync
284+ + 'static,
285+ M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
286+ + Clone
287+ + Send
288+ + Sync
289+ + 'static,
290 {
291 /// Listener address for the SMTP server to bind to listen for incoming
292 /// connections.
293 @@ -122,13 +148,13 @@ where
294 }
295
296 /// Process each message with the provided milter before it is delivered
297- pub fn with_milter(mut self, milter: M) -> Self {
298+ pub fn with_milter(mut self, milter: Milter<M>) -> Self {
299 self.milter = Some(milter);
300 self
301 }
302
303 /// Delivery handles the delivery of the final message
304- pub fn with_delivery(mut self, delivery: D) -> Self {
305+ pub fn with_delivery(mut self, delivery: Delivery<D>) -> Self {
306 self.delivery = Some(delivery);
307 self
308 }
309 @@ -202,8 +228,8 @@ where
310 self.shutdown_handles.push(tx);
311 let global_queue = global_queue.clone();
312 let stealers = stealers.clone();
313- let milter = self.milter.clone().expect("Milter not configured");
314- let delivery = self.delivery.clone().expect("Delivery not configured");
315+ let milter = self.milter.clone();
316+ let delivery = self.delivery.clone();
317 tokio::task::spawn(async move {
318 let mut worker = Worker {
319 milter,
320 @@ -271,7 +297,7 @@ where
321 #[cfg(test)]
322 mod test {
323
324- use crate::{DeliveryFunc, MilterFunc};
325+ use crate::{Delivery, Milter};
326
327 use super::*;
328
329 @@ -344,10 +370,10 @@ mod test {
330 ..Default::default()
331 };
332 let server = Server::default()
333- .with_milter(MilterFunc::new(|_: Message<'static>| {
334+ .with_milter(Milter::new(|_: Message<'static>| {
335 Box::pin(async move { Ok(Message::default().into_owned()) })
336 }))
337- .with_delivery(DeliveryFunc::new(|_: Message<'static>| {
338+ .with_delivery(Delivery::new(|_: Message<'static>| {
339 Box::pin(async move { Ok(()) })
340 }));
341 let framed = Framed::new(stream, Transport::default());
342 diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs
343index d91dee2..6df7eef 100644
344--- a/maitred/src/worker.rs
345+++ b/maitred/src/worker.rs
346 @@ -1,3 +1,5 @@
347+ use std::future::Future;
348+ use std::pin::Pin;
349 use std::sync::Arc;
350 use std::{iter, time::Duration};
351
352 @@ -7,7 +9,8 @@ use mail_parser::Message;
353 use tokio::sync::{mpsc::Receiver, Mutex};
354 use url::Host;
355
356- use crate::{Delivery, Error, Milter, Session};
357+ use crate::milter::Milter;
358+ use crate::{Delivery, Error, Session};
359
360 /// Session details to be passed internally for processing
361 #[derive(Clone, Debug)]
362 @@ -38,11 +41,19 @@ impl From<Session> for Packet {
363 /// SPF Verification
364 pub(crate) struct Worker<D, M>
365 where
366- D: Delivery + Clone + Send + Sync,
367- M: Milter + Clone + Send + Sync,
368+ D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
369+ + Clone
370+ + Send
371+ + Sync
372+ + 'static,
373+ M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
374+ + Clone
375+ + Send
376+ + Sync
377+ + 'static,
378 {
379- pub milter: M,
380- pub delivery: D,
381+ pub milter: Option<Milter<M>>,
382+ pub delivery: Option<Delivery<D>>,
383 pub global_queue: Arc<Injector<Packet>>,
384 pub stealers: Vec<Stealer<Packet>>,
385 pub local_queue: Arc<Mutex<WorkQueue<Packet>>>,
386 @@ -51,8 +62,14 @@ where
387
388 impl<D, M> Worker<D, M>
389 where
390- D: Delivery + Clone + Send + Sync,
391- M: Milter + Clone + Send + Sync + 'static,
392+ D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
393+ + Clone
394+ + Send
395+ + Sync,
396+ M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
397+ + Clone
398+ + Send
399+ + Sync,
400 {
401 async fn next_packet(&self) -> Option<Packet> {
402 let local_queue = self.local_queue.lock().await;
403 @@ -80,19 +97,24 @@ where
404
405 if let Some(packet) = self.next_packet().await {
406 let mut message = packet.body.unwrap();
407- match self.milter.apply(message.clone()).await {
408- Ok(modified) => {
409- tracing::info!("Milter finished successfully");
410- message = modified;
411- }
412- Err(err) => {
413- tracing::warn!("Milter failed to apply: {:?}", err);
414+ if let Some(milter) = &self.milter {
415+ match (milter.0)(message.clone()).await {
416+ Ok(modified) => {
417+ tracing::info!("Milter finished successfully");
418+ message = modified;
419+ }
420+ Err(err) => {
421+ tracing::warn!("Milter failed to apply: {:?}", err);
422+ }
423+ };
424+ }
425+
426+ if let Some(delivery) = &self.delivery {
427+ match (delivery.0)(message.clone()).await {
428+ Ok(_) => tracing::info!("Message successfully delivered"),
429+ // TODO: Implement retry here
430+ Err(err) => tracing::warn!("Message could not be delievered: {:?}", err),
431 }
432- };
433- match self.delivery.deliver(message.clone()).await {
434- Ok(_) => tracing::info!("Message successfully delivered"),
435- // TODO: Implement retry here
436- Err(err) => tracing::warn!("Message could not be delievered: {:?}", err),
437 }
438 } else {
439 ticker.tick().await;