Commit
Author: Kevin Schoon [me@kevinschoon.com]
Hash: aa7b016861cbbf34ebe6b4dfac07708d07a0aeeb
Timestamp: Sat, 31 Aug 2024 12:25:39 +0000 (4 months ago)

+162 -211 +/-9 browse
refactor all configurable closures as traits again
1diff --git a/cmd/maitred-debug/src/main.rs b/cmd/maitred-debug/src/main.rs
2index f859a08..ec5c708 100644
3--- a/cmd/maitred-debug/src/main.rs
4+++ b/cmd/maitred-debug/src/main.rs
5 @@ -1,7 +1,8 @@
6+ use tracing::Level;
7+
8 use maitred::{
9- mail_parser::Message, Delivery, DeliveryError, Error, Milter, Server, SessionOptions,
10+ mail_parser::Message, DeliveryError, DeliveryFunc, Error, MilterFunc, Server, SessionOptions,
11 };
12- use tracing::Level;
13
14 async fn print_message(message: Message<'static>) -> Result<(), DeliveryError> {
15 println!("New SMTP Message:");
16 @@ -28,10 +29,12 @@ async fn main() -> Result<(), Error> {
17 // Set the subscriber as the default subscriber
18 let mut mail_server = Server::default()
19 .address("127.0.0.1:2525")
20- .with_milter(Milter::new(|message: Message<'static>| {
21+ .with_milter(MilterFunc(|message: &Message<'static>| {
22+ let message = message.clone();
23 Box::pin(async move { Ok(message.to_owned()) })
24 }))
25- .with_delivery(Delivery::new(|message: Message<'static>| {
26+ .with_delivery(DeliveryFunc(|message: &Message<'static>| {
27+ let message = message.clone();
28 Box::pin(async move { print_message(message.to_owned()).await })
29 }))
30 .with_session_opts(SessionOptions::default());
31 diff --git a/maitred/src/delivery.rs b/maitred/src/delivery.rs
32index ea52344..e8ecb6d 100644
33--- a/maitred/src/delivery.rs
34+++ b/maitred/src/delivery.rs
35 @@ -1,31 +1,46 @@
36- use std::result::Result as StdResult;
37- use std::{future::Future, pin::Pin, sync::Arc};
38+ use std::future::Future;
39
40+ use async_trait::async_trait;
41 use mail_parser::Message;
42
43- /// Result type containing a new message with possible modifications or an
44- /// error indicating it should not be processed.
45- pub type Result = StdResult<(), Error>;
46-
47 #[derive(Debug, thiserror::Error)]
48- pub enum Error {
49+ pub enum DeliveryError {
50 /// Indicates an unspecified error that occurred during milting.
51 #[error("Internal Server Error: {0}")]
52 Server(String),
53 }
54
55- /// Delivery is the final stage of accepting an e-mail and may be invoked
56+ /// Delivery is the final stage of accepting an e-mail and may be invoked
57 /// multiple times depending on the server configuration.
58- #[derive(Clone)]
59- pub struct Delivery<F>(pub Arc<F>)
60+ #[async_trait]
61+ pub trait Delivery: Sync + Send {
62+ async fn deliver(&self, message: &Message<'static>) -> Result<(), DeliveryError>;
63+ }
64+
65+ /// DeliveryFunc wraps an async closure implementing the Delivery trait.
66+ /// ```rust
67+ /// use maitred::DeliveryFunc;
68+ /// use maitred::mail_parser::Message;
69+ ///
70+ /// let delivery = DeliveryFunc(|message: &Message<'static>| {
71+ /// async move {
72+ /// Ok(())
73+ /// }
74+ /// });
75+ /// ```
76+ pub struct DeliveryFunc<F, T>(pub F)
77 where
78- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Sync + 'static;
79+ F: Fn(&Message<'static>) -> T + Sync + Send,
80+ T: Future<Output = Result<(), DeliveryError>> + Send;
81
82- impl<F> Delivery<F>
83+ #[async_trait]
84+ impl<F, T> Delivery for DeliveryFunc<F, T>
85 where
86- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Sync + 'static,
87+ F: Fn(&Message<'static>) -> T + Sync + Send,
88+ T: Future<Output = Result<(), DeliveryError>> + Send,
89 {
90- pub fn new(func: F) -> Self {
91- Delivery(Arc::new(func))
92+ async fn deliver(&self, message: &Message<'static>) -> Result<(), DeliveryError> {
93+ let f = (self.0)(message);
94+ f.await
95 }
96 }
97 diff --git a/maitred/src/expand.rs b/maitred/src/expand.rs
98index 232eb21..788b0a0 100644
99--- a/maitred/src/expand.rs
100+++ b/maitred/src/expand.rs
101 @@ -1,15 +1,11 @@
102- use std::{future::Future, result::Result as StdResult};
103+ use std::future::Future;
104
105 use async_trait::async_trait;
106 use email_address::EmailAddress;
107
108- /// Result type containing any of the associated e-mail addresses with the
109- /// given mailing list name.
110- pub type Result = StdResult<Vec<EmailAddress>, Error>;
111-
112 /// An error encountered while expanding a mail address
113 #[derive(Debug, thiserror::Error)]
114- pub enum Error {
115+ pub enum ExpansionError {
116 /// Indicates an unspecified error that occurred during expansion.
117 #[error("Internal Server Error: {0}")]
118 Server(String),
119 @@ -25,10 +21,10 @@ pub enum Error {
120 #[async_trait]
121 pub trait Expansion {
122 /// Expand the group into an array of members
123- async fn expand(&self, name: &str) -> Result;
124+ async fn expand(&self, name: &str) -> Result<Vec<EmailAddress>, ExpansionError>;
125 }
126
127- /// Helper wrapper implementing the Expansion trait
128+ /// ExpansionFunc wraps an async closure implementing the Expansion trait
129 /// # Example
130 /// ```rust
131 /// use email_address::EmailAddress;
132 @@ -43,18 +39,18 @@ pub trait Expansion {
133 /// }
134 /// });
135 /// ```
136- pub struct Func<F, T>(pub F)
137+ pub struct ExpansionFunc<F, T>(pub F)
138 where
139 F: Fn(&str) -> T + Sync,
140- T: Future<Output = Result> + Send;
141+ T: Future<Output = Result<Vec<EmailAddress>, ExpansionError>> + Send;
142
143 #[async_trait]
144- impl<F, T> Expansion for Func<F, T>
145+ impl<F, T> Expansion for ExpansionFunc<F, T>
146 where
147 F: Fn(&str) -> T + Sync,
148- T: Future<Output = Result> + Send,
149+ T: Future<Output = Result<Vec<EmailAddress>, ExpansionError>> + Send,
150 {
151- async fn expand(&self, name: &str) -> Result {
152+ async fn expand(&self, name: &str) -> Result<Vec<EmailAddress>, ExpansionError> {
153 let f = (self.0)(name);
154 f.await
155 }
156 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
157index b78d80f..d95857e 100644
158--- a/maitred/src/lib.rs
159+++ b/maitred/src/lib.rs
160 @@ -3,7 +3,7 @@
161 //! # Example SMTP Server
162 //! ```rust
163 //! use maitred::{
164- //! mail_parser::Message, DeliveryError, Delivery, Error,
165+ //! mail_parser::Message, DeliveryError, DeliveryFunc, MilterFunc, Error,
166 //! Milter, Server, SessionOptions,
167 //! };
168 //! use tracing::Level;
169 @@ -33,10 +33,12 @@
170 //! // Set the subscriber as the default subscriber
171 //! let mut mail_server = Server::default()
172 //! .address("127.0.0.1:2525")
173- //! .with_milter(Milter::new(|message: Message<'static>| {
174+ //! .with_milter(MilterFunc(|message: &Message<'static>| {
175+ //! let message = message.clone();
176 //! Box::pin(async move { Ok(message.to_owned()) })
177 //! }))
178- //! .with_delivery(Delivery::new(|message: Message<'static>| {
179+ //! .with_delivery(DeliveryFunc(|message: &Message<'static>| {
180+ //! let message = message.clone();
181 //! Box::pin(async move { print_message(message.to_owned()).await })
182 //! }))
183 //! .with_session_opts(SessionOptions::default());
184 @@ -45,34 +47,35 @@
185 //! }
186 //! ```
187
188- mod delivery;
189+ pub mod delivery;
190+ pub mod expand;
191+ pub mod milter;
192+ pub mod verify;
193+
194+ pub use email_address;
195+ pub use mail_parser;
196+ pub use smtp_proto;
197+
198 mod error;
199- mod expand;
200- mod milter;
201 mod server;
202 mod session;
203 mod transport;
204- mod verify;
205 mod worker;
206
207 use smtp_proto::Response as SmtpResponse;
208 use transport::Response;
209
210- pub use delivery::{Delivery, Error as DeliveryError};
211- pub use expand::{Error as ExpansionError, Expansion, Func as ExpansionFunc};
212- pub use milter::{Error as MilterError, Milter};
213+ pub use delivery::{Delivery, DeliveryError, DeliveryFunc};
214+ pub use expand::{Expansion, ExpansionError, ExpansionFunc};
215+ pub use milter::{Milter, MilterError, MilterFunc};
216+ pub use verify::{Verify, VerifyError, VerifyFunc};
217
218 pub use error::Error;
219 pub use server::Server;
220 pub use session::{
221- Options as SessionOptions, Session, DEFAULT_CAPABILITIES, DEFAULT_GREETING,
222- DEFAULT_HELP_BANNER, DEFAULT_MAXIMUM_MESSAGE_SIZE,
223+ SessionOptions, DEFAULT_CAPABILITIES, DEFAULT_GREETING, DEFAULT_HELP_BANNER,
224+ DEFAULT_MAXIMUM_MESSAGE_SIZE,
225 };
226- pub use verify::{Error as VerifyError, Func as VerifyFunc, Verify};
227-
228- pub use email_address;
229- pub use mail_parser;
230- pub use smtp_proto;
231
232 /// Generate a single smtp_response
233 macro_rules! smtp_response {
234 diff --git a/maitred/src/milter.rs b/maitred/src/milter.rs
235index 0ce1521..28ffde6 100644
236--- a/maitred/src/milter.rs
237+++ b/maitred/src/milter.rs
238 @@ -1,16 +1,11 @@
239 use std::future::Future;
240- use std::pin::Pin;
241- use std::result::Result as StdResult;
242- use std::sync::Arc;
243
244+ use async_trait::async_trait;
245 use mail_parser::Message;
246
247- /// Result type containing a new message with possible modifications or an
248- /// error indicating it should not be processed.
249- pub type Result = StdResult<Message<'static>, Error>;
250-
251+ /// Milter error is a milter specific error
252 #[derive(Debug, thiserror::Error)]
253- pub enum Error {
254+ pub enum MilterError {
255 /// Indicates an unspecified error that occurred during milting.
256 #[error("Internal Server Error: {0}")]
257 Server(String),
258 @@ -19,27 +14,37 @@ pub enum Error {
259 /// A [Milter](https://en.wikipedia.org/wiki/Milter) accepts an email message
260 /// and performs some permutation, modification, or rejection and then returns
261 /// the message.
262+
263+ #[async_trait]
264+ pub trait Milter: Sync + Send {
265+ async fn apply(&self, message: &Message<'static>) -> Result<Message<'static>, MilterError>;
266+ }
267+
268+ /// MilterFunc wraps an async closure implementing the Milter trait.
269 /// ```rust
270 /// use mail_parser::Message;
271- /// use maitred::Milter;
272+ /// use maitred::MilterFunc;
273 ///
274- /// let milter = Milter::new(|message: Message<'static>| {
275- /// Box::pin(async move {
276+ /// let milter = MilterFunc(|message: &Message<'static>| {
277+ /// async move {
278 /// // rewrite message here
279 /// Ok(Message::default().to_owned())
280- /// })
281+ /// }
282 /// });
283 /// ```
284- #[derive(Clone)]
285- pub struct Milter<F>(pub Arc<F>)
286+ pub struct MilterFunc<F, T>(pub F)
287 where
288- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync;
289+ F: Fn(&Message<'static>) -> T + Sync + Send,
290+ T: Future<Output = Result<Message<'static>, MilterError>> + Send;
291
292- impl<F> Milter<F>
293+ #[async_trait]
294+ impl<F, T> Milter for MilterFunc<F, T>
295 where
296- F: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = Result> + Send>> + Send + Sync,
297+ F: Fn(&Message<'static>) -> T + Sync + Send,
298+ T: Future<Output = Result<Message<'static>, MilterError>> + Send,
299 {
300- pub fn new(func: F) -> Self {
301- Milter(Arc::new(func))
302+ async fn apply(&self, message: &Message<'static>) -> Result<Message<'static>, MilterError> {
303+ let f = (self.0)(message);
304+ f.await
305 }
306 }
307 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
308index 282dc8e..4a5517f 100644
309--- a/maitred/src/server.rs
310+++ b/maitred/src/server.rs
311 @@ -1,5 +1,3 @@
312- use std::future::Future;
313- use std::pin::Pin;
314 use std::rc::Rc;
315 use std::sync::Arc;
316 use std::time::Duration;
317 @@ -9,7 +7,6 @@ use crossbeam_deque::Stealer;
318 use crossbeam_deque::Worker as WorkQueue;
319 use futures::SinkExt;
320 use futures::StreamExt;
321- use mail_parser::Message;
322 use smtp_proto::Request;
323 use tokio::net::TcpListener;
324 use tokio::sync::mpsc::Sender;
325 @@ -19,12 +16,14 @@ use tokio::time::timeout;
326 use tokio_stream::{self as stream};
327 use tokio_util::codec::Framed;
328
329+ use crate::delivery::Delivery;
330 use crate::error::Error;
331- use crate::session::Session;
332+ use crate::milter::Milter;
333+ use crate::session::{Session, SessionOptions};
334 use crate::smtp_response;
335 use crate::transport::{Command, Transport};
336 use crate::worker::{Packet, Worker};
337- use crate::{Delivery, Milter, Response, SmtpResponse};
338+ use crate::{Response, SmtpResponse};
339
340 /// The default port the server will listen on if none was specified in it's
341 /// configuration options.
342 @@ -32,7 +31,7 @@ pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:2525";
343
344 /// Maximum amount of time the server will wait for a command before closing
345 /// the connection.
346- const DEFAULT_GLOBAL_TIMEOUT_SECS: u64 = 300;
347+ pub const DEFAULT_GLOBAL_TIMEOUT_SECS: u64 = 300;
348
349 /// check if the final command is QUIT
350 fn is_quit(reqs: &[Request<String>]) -> bool {
351 @@ -42,67 +41,31 @@ fn is_quit(reqs: &[Request<String>]) -> bool {
352 /// Server implements everything that is required to run an SMTP server by
353 /// binding to the configured address and processing individual TCP connections
354 /// as they are received.
355- pub struct Server<D, M>
356- where
357- D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
358- + Clone
359- + Send
360- + Sync
361- + 'static,
362- M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
363- + Send
364- + Sync
365- + 'static,
366- {
367+ pub struct Server {
368 address: String,
369 global_timeout: Duration,
370- options: Option<Rc<crate::session::Options>>,
371- milter: Option<Milter<M>>,
372- delivery: Option<Delivery<D>>,
373+ options: Option<Rc<crate::session::SessionOptions>>,
374+ milter: Option<Arc<dyn Milter>>,
375+ delivery: Option<Arc<dyn Delivery>>,
376 n_threads: usize,
377 shutdown_handles: Vec<Sender<bool>>,
378 }
379
380- impl<D, M> Default for Server<D, M>
381- where
382- D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
383- + Clone
384- + Send
385- + Sync
386- + 'static,
387-
388- M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
389- + Clone
390- + Send
391- + Sync
392- + 'static,
393- {
394+ impl Default for Server {
395 fn default() -> Self {
396- Server::<D, M> {
397+ Server {
398 address: DEFAULT_LISTEN_ADDR.to_string(),
399 global_timeout: Duration::from_secs(DEFAULT_GLOBAL_TIMEOUT_SECS),
400 options: None,
401- milter: None::<Milter<M>>,
402- delivery: None::<Delivery<D>>,
403+ milter: None,
404+ delivery: None,
405 n_threads: std::thread::available_parallelism().unwrap().into(),
406 shutdown_handles: vec![],
407 }
408 }
409 }
410
411- impl<D, M> Server<D, M>
412- where
413- D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
414- + Clone
415- + Send
416- + Sync
417- + 'static,
418- M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
419- + Clone
420- + Send
421- + Sync
422- + 'static,
423- {
424+ impl Server {
425 /// Listener address for the SMTP server to bind to listen for incoming
426 /// connections.
427 pub fn address(mut self, address: &str) -> Self {
428 @@ -120,20 +83,26 @@ where
429 /// Set session level options that affect the behavior of individual SMTP
430 /// sessions. Most custom behavior is implemented here but not specifying
431 /// any options will provide a limited but functional server.
432- pub fn with_session_opts(mut self, opts: crate::session::Options) -> Self {
433+ pub fn with_session_opts(mut self, opts: SessionOptions) -> Self {
434 self.options = Some(Rc::new(opts));
435 self
436 }
437
438 /// Process each message with the provided milter before it is delivered
439- pub fn with_milter(mut self, milter: Milter<M>) -> Self {
440- self.milter = Some(milter);
441+ pub fn with_milter<T>(mut self, milter: T) -> Self
442+ where
443+ T: Milter + 'static,
444+ {
445+ self.milter = Some(Arc::new(milter));
446 self
447 }
448
449 /// Delivery handles the delivery of the final message
450- pub fn with_delivery(mut self, delivery: Delivery<D>) -> Self {
451- self.delivery = Some(delivery);
452+ pub fn with_delivery<T>(mut self, delivery: T) -> Self
453+ where
454+ T: Delivery + 'static,
455+ {
456+ self.delivery = Some(Arc::new(delivery));
457 self
458 }
459
460 @@ -329,11 +298,10 @@ where
461 #[cfg(test)]
462 mod test {
463
464- use crate::{Delivery, Milter, SessionOptions};
465+ use crate::SessionOptions;
466
467 use super::*;
468
469- use mail_parser::Message;
470 use std::io;
471 use std::pin::Pin;
472 use std::task::{Context, Poll};
473 @@ -402,13 +370,7 @@ mod test {
474 };
475 let server = Server::default()
476 // turn off all extended capabilities
477- .with_session_opts(SessionOptions::default().capabilities(0))
478- .with_milter(Milter::new(|_: Message<'static>| {
479- Box::pin(async move { Ok(Message::default().into_owned()) })
480- }))
481- .with_delivery(Delivery::new(|_: Message<'static>| {
482- Box::pin(async move { Ok(()) })
483- }));
484+ .with_session_opts(SessionOptions::default().capabilities(0));
485 let framed = Framed::new(stream, Transport::default());
486 let global_queue = Arc::new(Injector::<Packet>::new());
487 server.process(framed, global_queue.clone()).await.unwrap();
488 @@ -435,13 +397,7 @@ mod test {
489 ],
490 ..Default::default()
491 };
492- let server = Server::default()
493- .with_milter(Milter::new(|_: Message<'static>| {
494- Box::pin(async move { Ok(Message::default().into_owned()) })
495- }))
496- .with_delivery(Delivery::new(|_: Message<'static>| {
497- Box::pin(async move { Ok(()) })
498- }));
499+ let server = Server::default();
500 let framed = Framed::new(stream, Transport::default());
501 let global_queue = Arc::new(Injector::<Packet>::new());
502 server.process(framed, global_queue.clone()).await.unwrap();
503 diff --git a/maitred/src/session.rs b/maitred/src/session.rs
504index 5c1a081..75b48c8 100644
505--- a/maitred/src/session.rs
506+++ b/maitred/src/session.rs
507 @@ -28,15 +28,10 @@ pub const DEFAULT_MAXIMUM_MESSAGE_SIZE: u64 = 5_000_000;
508 pub const DEFAULT_GREETING: &str = "Maitred ESMTP Server";
509
510 // TODO:
511- // 250-PIPELINING
512- // 250-SIZE 10240000
513- // 250-VRFY
514 // 250-ETRN
515- // 250-ENHANCEDSTATUSCODES
516 // 250-8BITMIME
517 // 250-DSN
518 // 250-SMTPUTF8
519- // 250 CHUNKING
520
521 /// Default SMTP capabilities advertised by the server
522 pub const DEFAULT_CAPABILITIES: u32 = smtp_proto::EXT_SIZE
523 @@ -65,7 +60,7 @@ pub fn timeout(message: &str) -> Response<String> {
524
525 /// Session level options that configure individual SMTP transactions
526 #[derive(Clone)]
527- pub struct Options {
528+ pub struct SessionOptions {
529 pub our_hostname: String,
530 pub maximum_size: u64,
531 pub capabilities: u32,
532 @@ -75,9 +70,9 @@ pub struct Options {
533 pub verification: Option<Arc<dyn Verify>>,
534 }
535
536- impl Default for Options {
537+ impl Default for SessionOptions {
538 fn default() -> Self {
539- Options {
540+ SessionOptions {
541 our_hostname: String::default(),
542 maximum_size: DEFAULT_MAXIMUM_MESSAGE_SIZE,
543 capabilities: DEFAULT_CAPABILITIES,
544 @@ -89,7 +84,7 @@ impl Default for Options {
545 }
546 }
547
548- impl Options {
549+ impl SessionOptions {
550 pub fn our_hostname(mut self, hostname: &str) -> Self {
551 self.our_hostname = hostname.to_string();
552 self
553 @@ -129,7 +124,7 @@ impl Options {
554
555 /// Stateful connection that coresponds to a single SMTP session.
556 #[derive(Default)]
557- pub struct Session {
558+ pub(crate) struct Session {
559 /// message body
560 pub body: Option<Message<'static>>,
561 /// mailto address
562 @@ -142,12 +137,12 @@ pub struct Session {
563 initialized: Option<Mode>,
564
565 // session options
566- opts: Rc<Options>,
567+ opts: Rc<SessionOptions>,
568 }
569
570 impl Session {
571 /// Configure a session with various options that effect it's behavior.
572- pub fn with_options(mut self, opts: Rc<Options>) -> Self {
573+ pub fn with_options(mut self, opts: Rc<SessionOptions>) -> Self {
574 self.opts = opts;
575 self
576 }
577 @@ -174,14 +169,6 @@ impl Session {
578 )
579 }
580
581- /// If the session is in extended mode i.e. EHLO was sent in the beginning
582- /// of the connection.
583- pub fn is_extended(&self) -> bool {
584- self.initialized
585- .as_ref()
586- .is_some_and(|mode| matches!(mode, Mode::Extended))
587- }
588-
589 /// Check if the capability is supported by the session
590 pub fn has_capability(&self, capability: u32) -> bool {
591 self.initialized
592 @@ -566,7 +553,7 @@ mod test {
593 }];
594 let session = Mutex::new(
595 Session::default()
596- .with_options(Options::default().our_hostname(EXAMPLE_HOSTNAME).into()),
597+ .with_options(SessionOptions::default().our_hostname(EXAMPLE_HOSTNAME).into()),
598 );
599 process_all(&session, requests).await;
600 }
601 @@ -606,8 +593,8 @@ mod test {
602 ];
603 let session = Mutex::new(
604 Session::default().with_options(
605- Options::default()
606- .list_expansion(crate::expand::Func(|name: &str| {
607+ SessionOptions::default()
608+ .list_expansion(crate::expand::ExpansionFunc(|name: &str| {
609 let name = name.to_string();
610 async move {
611 assert!(name == "mailing-list");
612 @@ -660,8 +647,8 @@ mod test {
613 ];
614 let session = Mutex::new(
615 Session::default().with_options(
616- Options::default()
617- .verification(crate::verify::Func(|addr: &EmailAddress| {
618+ SessionOptions::default()
619+ .verification(crate::verify::VerifyFunc(|addr: &EmailAddress| {
620 let addr = addr.clone();
621 async move {
622 assert!(addr.email() == "bar@baz.com");
623 @@ -758,7 +745,7 @@ mod test {
624 ];
625 let session = Mutex::new(
626 Session::default().with_options(
627- Options::default()
628+ SessionOptions::default()
629 .our_hostname(EXAMPLE_HOSTNAME)
630 .capabilities(DEFAULT_CAPABILITIES)
631 .into(),
632 @@ -826,7 +813,7 @@ transport rather than the session.
633 ];
634 let session = Mutex::new(
635 Session::default()
636- .with_options(Options::default().our_hostname(EXAMPLE_HOSTNAME).into()),
637+ .with_options(SessionOptions::default().our_hostname(EXAMPLE_HOSTNAME).into()),
638 );
639 process_all(&session, requests).await;
640 let session = session.lock().await;
641 diff --git a/maitred/src/verify.rs b/maitred/src/verify.rs
642index 090c523..47e735e 100644
643--- a/maitred/src/verify.rs
644+++ b/maitred/src/verify.rs
645 @@ -1,15 +1,11 @@
646- use std::{future::Future, result::Result as StdResult};
647+ use std::future::Future;
648
649 use async_trait::async_trait;
650 use email_address::EmailAddress;
651
652- /// Result indicating the VRFY command was successful and the user was
653- /// correctly identified by the server.
654- pub type Result = StdResult<(), Error>;
655-
656 /// An error encountered while verifying an e-mail address
657 #[derive(Debug, thiserror::Error)]
658- pub enum Error {
659+ pub enum VerifyError {
660 /// Indicates an unspecified error that occurred during expansion
661 #[error("Internal Server Error: {0}")]
662 Server(String),
663 @@ -30,22 +26,34 @@ pub enum Error {
664 #[async_trait]
665 pub trait Verify {
666 /// Verify the e-mail address on the server
667- async fn verify(&self, address: &EmailAddress) -> Result;
668+ async fn verify(&self, address: &EmailAddress) -> Result<(), VerifyError>;
669 }
670
671- /// Helper wrapper implementing the Verify trait.
672- pub struct Func<F, T>(pub F)
673+ /// VerifyFunc wraps an async closure implementing the Verify trait.
674+ /// # Example
675+ /// ```rust
676+ /// use maitred::VerifyFunc;
677+ /// use maitred::email_address::EmailAddress;
678+ ///
679+ /// let verify = VerifyFunc(|address: &EmailAddress| {
680+ /// async move {
681+ /// Ok(())
682+ /// }
683+ /// });
684+ ///
685+ /// ```
686+ pub struct VerifyFunc<F, T>(pub F)
687 where
688 F: Fn(&EmailAddress) -> T + Sync,
689- T: Future<Output = Result> + Send;
690+ T: Future<Output = Result<(), VerifyError>> + Send;
691
692 #[async_trait]
693- impl<F, T> Verify for Func<F, T>
694+ impl<F, T> Verify for VerifyFunc<F, T>
695 where
696 F: Fn(&EmailAddress) -> T + Sync,
697- T: Future<Output = Result> + Send,
698+ T: Future<Output = Result<(), VerifyError>> + Send,
699 {
700- async fn verify(&self, address: &EmailAddress) -> Result {
701+ async fn verify(&self, address: &EmailAddress) -> Result<(), VerifyError> {
702 let f = (self.0)(address);
703 f.await
704 }
705 diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs
706index 7865808..e8c2b94 100644
707--- a/maitred/src/worker.rs
708+++ b/maitred/src/worker.rs
709 @@ -1,5 +1,3 @@
710- use std::future::Future;
711- use std::pin::Pin;
712 use std::sync::Arc;
713 use std::{iter, time::Duration};
714
715 @@ -9,8 +7,10 @@ use mail_parser::Message;
716 use tokio::sync::{mpsc::Receiver, Mutex};
717 use url::Host;
718
719+ use crate::delivery::Delivery;
720 use crate::milter::Milter;
721- use crate::{Delivery, Error, Session};
722+ use crate::session::Session;
723+ use crate::Error;
724
725 /// Session details to be passed internally for processing
726 #[derive(Clone, Debug)]
727 @@ -39,38 +39,16 @@ impl From<&Session> for Packet {
728 /// Running DKIM verification
729 /// ARC Verficiation
730 /// SPF Verification
731- pub(crate) struct Worker<D, M>
732- where
733- D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
734- + Clone
735- + Send
736- + Sync
737- + 'static,
738- M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
739- + Clone
740- + Send
741- + Sync
742- + 'static,
743- {
744- pub milter: Option<Milter<M>>,
745- pub delivery: Option<Delivery<D>>,
746+ pub(crate) struct Worker {
747+ pub milter: Option<Arc<dyn Milter>>,
748+ pub delivery: Option<Arc<dyn Delivery>>,
749 pub global_queue: Arc<Injector<Packet>>,
750 pub stealers: Vec<Stealer<Packet>>,
751 pub local_queue: Arc<Mutex<WorkQueue<Packet>>>,
752 pub shutdown_rx: Receiver<bool>,
753 }
754
755- impl<D, M> Worker<D, M>
756- where
757- D: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::delivery::Result> + Send>>
758- + Clone
759- + Send
760- + Sync,
761- M: Fn(Message<'static>) -> Pin<Box<dyn Future<Output = crate::milter::Result> + Send>>
762- + Clone
763- + Send
764- + Sync,
765- {
766+ impl Worker {
767 async fn next_packet(&self) -> Option<Packet> {
768 let local_queue = self.local_queue.lock().await;
769 local_queue.pop().or_else(|| {
770 @@ -98,7 +76,7 @@ where
771 if let Some(packet) = self.next_packet().await {
772 let mut message = packet.body.unwrap();
773 if let Some(milter) = &self.milter {
774- match (milter.0)(message.clone()).await {
775+ match milter.apply(&message).await {
776 Ok(modified) => {
777 tracing::info!("Milter finished successfully");
778 message = modified;
779 @@ -110,7 +88,7 @@ where
780 }
781
782 if let Some(delivery) = &self.delivery {
783- match (delivery.0)(message.clone()).await {
784+ match delivery.deliver(&message).await {
785 Ok(_) => tracing::info!("Message successfully delivered"),
786 // TODO: Implement retry here
787 Err(err) => tracing::warn!("Message could not be delievered: {:?}", err),