Author: Kevin Schoon [me@kevinschoon.com]
Hash: f3cd75be60f4bd71b6a7db1d06d6304c12f9c4fb
Timestamp: Thu, 15 Aug 2024 00:28:12 +0000 (2 months ago)

+143 -50 +/-5 browse
implement basic milter support without queue or delivery
1diff --git a/cmd/maitred-debug/src/main.rs b/cmd/maitred-debug/src/main.rs
2index e8ddae1..d62121f 100644
3--- a/cmd/maitred-debug/src/main.rs
4+++ b/cmd/maitred-debug/src/main.rs
5 @@ -1,4 +1,4 @@
6- use maitred::{Error, Server, SessionOptions};
7+ use maitred::{mail_parser::Message, Error, MilterFunc, Server, SessionOptions};
8 use tracing::Level;
9
10 #[tokio::main]
11 @@ -13,6 +13,12 @@ async fn main() -> Result<(), Error> {
12 // Set the subscriber as the default subscriber
13 let mail_server = Server::new("localhost")
14 .address("127.0.0.1:2525")
15+ .with_milter(MilterFunc(|message: &Message| {
16+ println!("{:?}", message);
17+ async move {
18+ Ok(Message::default().into_owned())
19+ }
20+ }))
21 .with_session_opts(SessionOptions::default());
22 mail_server.listen().await?;
23 Ok(())
24 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
25index c13f738..ca47995 100644
26--- a/maitred/src/lib.rs
27+++ b/maitred/src/lib.rs
28 @@ -23,6 +23,7 @@
29
30 mod error;
31 mod expand;
32+ mod milter;
33 mod pipeline;
34 mod server;
35 mod session;
36 @@ -34,6 +35,7 @@ use transport::Response;
37
38 pub use error::Error;
39 pub use expand::{Error as ExpansionError, Expansion, Func as ExpansionFunc};
40+ pub use milter::{Error as MilterError, Func as MilterFunc, Milter};
41 pub use pipeline::{Pipeline, Transaction};
42 pub use server::Server;
43 pub use session::{
44 @@ -43,6 +45,7 @@ pub use session::{
45 pub use verify::{Error as VerifyError, Func as VerifyFunc, Verify};
46
47 pub use email_address;
48+ pub use mail_parser;
49 pub use smtp_proto;
50
51 /// Chunk is a logical set of SMTP resposnes that might be generated from one
52 diff --git a/maitred/src/milter.rs b/maitred/src/milter.rs
53new file mode 100644
54index 0000000..978f25e
55--- /dev/null
56+++ b/maitred/src/milter.rs
57 @@ -0,0 +1,56 @@
58+ use std::future::Future;
59+ use std::result::Result as StdResult;
60+
61+ use async_trait::async_trait;
62+ use mail_parser::Message;
63+
64+ /// Result type containing a new message with possible modifications or an
65+ /// error indicating it should not be processed.
66+ pub type Result = StdResult<Message<'static>, Error>;
67+
68+ /// An error encountered while expanding a mail address
69+ #[derive(Debug, thiserror::Error)]
70+ pub enum Error {
71+ /// Indicates an unspecified error that occurred during milting.
72+ #[error("Internal Server Error: {0}")]
73+ Server(String),
74+ }
75+
76+ /// A Milter (https://en.wikipedia.org/wiki/Milter) accepts an email message
77+ /// and performs some permutation, modification, or rejection and then returns
78+ /// the message.
79+ #[async_trait]
80+ pub trait Milter {
81+ /// Apply the milter function to the incoming message
82+ async fn apply(&self, message: &Message) -> Result;
83+ }
84+
85+ /// Helper wrapper implementing the Expansion trait
86+ /// # Example
87+ /// ```rust
88+ /// use mail_parser::Message;
89+ /// use maitred::MilterFunc;
90+ ///
91+ /// let my_expn_fn = MilterFunc(|message: &Message| {
92+ /// async move {
93+ /// // rewrite message here
94+ /// Ok(Message::default().to_owned())
95+ /// }
96+ /// });
97+ /// ```
98+ pub struct Func<F, T>(pub F)
99+ where
100+ F: Fn(&Message) -> T + Sync,
101+ T: Future<Output = Result> + Send;
102+
103+ #[async_trait]
104+ impl<F, T> Milter for Func<F, T>
105+ where
106+ F: Fn(&Message) -> T + Sync,
107+ T: Future<Output = Result> + Send,
108+ {
109+ async fn apply(&self, message: &Message) -> Result {
110+ let f = (self.0)(message);
111+ f.await
112+ }
113+ }
114 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
115index a78b7ec..00d8f5b 100644
116--- a/maitred/src/server.rs
117+++ b/maitred/src/server.rs
118 @@ -1,8 +1,10 @@
119 use std::rc::Rc;
120+ use std::sync::Arc;
121 use std::time::Duration;
122
123 use bytes::Bytes;
124 use futures::SinkExt;
125+ use mail_parser::MessageParser;
126 use smtp_proto::Request;
127 use tokio::sync::Mutex;
128 use tokio::{net::TcpListener, time::timeout};
129 @@ -13,7 +15,7 @@ use crate::error::Error;
130 use crate::pipeline::Pipeline;
131 use crate::session::Session;
132 use crate::transport::{Response, Transport};
133- use crate::Chunk;
134+ use crate::{Chunk, Milter};
135
136 /// The default port the server will listen on if none was specified in it's
137 /// configuration options.
138 @@ -58,6 +60,7 @@ pub struct Server {
139 hostname: String,
140 global_timeout: Duration,
141 options: Option<Rc<crate::session::Options>>,
142+ milters: Vec<Arc<dyn Milter>>,
143 }
144
145 impl Default for Server {
146 @@ -67,6 +70,7 @@ impl Default for Server {
147 hostname: String::default(),
148 global_timeout: Duration::from_secs(DEFAULT_GLOBAL_TIMEOUT_SECS),
149 options: None,
150+ milters: vec![],
151 }
152 }
153 }
154 @@ -102,6 +106,16 @@ impl Server {
155 self
156 }
157
158+ /// Append one or more milters which will be applied to messages after the
159+ /// session has been closed but before they are enqued for delivery.
160+ pub fn with_milter<T>(mut self, milter: T) -> Self
161+ where
162+ T: Milter + 'static,
163+ {
164+ self.milters.push(Arc::new(milter));
165+ self
166+ }
167+
168 async fn process<T>(
169 &self,
170 mut framed: Framed<T, Transport>,
171 @@ -170,16 +184,25 @@ impl Server {
172 }
173
174 let greeting = session.greeting();
175-
176 let session = Mutex::new(session);
177-
178 let mut pipelined = ConditionalPipeline {
179 session: &session,
180 pipeline: &mut Pipeline::default(),
181 };
182
183- if let Err(err) = self.process(framed, &mut pipelined, greeting).await {
184- tracing::warn!("Client encountered an error: {:?}", err);
185+ match self.process(framed, &mut pipelined, greeting).await {
186+ Ok(_) => {
187+ let session = session.lock().await;
188+ let message = session.body.as_ref().expect("Session has no body");
189+ tracing::info!("session concluded successfully");
190+ // FIXME: Pass into queue and actually process appropriately
191+ for milter in self.milters.clone() {
192+ milter.apply(message).await.unwrap();
193+ }
194+ }
195+ Err(err) => {
196+ tracing::warn!("Client encountered an error: {:?}", err);
197+ }
198 }
199 }
200 }
201 @@ -188,8 +211,11 @@ impl Server {
202 #[cfg(test)]
203 mod test {
204
205+ use crate::MilterFunc;
206+
207 use super::*;
208
209+ use mail_parser::Message;
210 use std::io;
211 use std::pin::Pin;
212 use std::task::{Context, Poll};
213 @@ -257,7 +283,10 @@ mod test {
214 ],
215 ..Default::default()
216 };
217- let server = Server::new("example.org");
218+ let server = Server::new("example.org").with_milter(MilterFunc(|message: &Message| {
219+ println!("{:?}", message);
220+ async move { Ok(Message::default().into_owned()) }
221+ }));
222 let framed = Framed::new(stream, Transport::default());
223 let session = Session::default();
224 let greeting = session.greeting();
225 diff --git a/maitred/src/session.rs b/maitred/src/session.rs
226index 18fc81f..58da7d1 100644
227--- a/maitred/src/session.rs
228+++ b/maitred/src/session.rs
229 @@ -6,14 +6,14 @@ use std::sync::Arc;
230 use bytes::Bytes;
231 use email_address::EmailAddress;
232
233- use mail_parser::MessageParser;
234+ use mail_parser::{Message, MessageParser};
235 use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
236 use url::Host;
237
238 use crate::expand::Expansion;
239 use crate::transport::Response;
240 use crate::verify::Verify;
241- use crate::{smtp_chunk, smtp_chunk_err, smtp_chunk_ok};
242+ use crate::{smtp_chunk, smtp_chunk_err, smtp_chunk_ok, Milter};
243 use crate::{smtp_response, Chunk};
244
245 /// Default help banner returned from a HELP command without any parameters
246 @@ -132,7 +132,7 @@ impl Options {
247 #[derive(Default)]
248 pub struct Session {
249 /// message body
250- pub body: Option<Vec<u8>>,
251+ pub body: Option<Message<'static>>,
252 /// mailto address
253 pub mail_from: Option<EmailAddress>,
254 /// rcpt address
255 @@ -283,25 +283,27 @@ impl Session {
256 panic!("Transfer mode changed from DATA to BDAT")
257 }
258 DataTransfer::Bdat => {
259- let message_payload = data
260- .as_ref()
261- .expect("data returned without a payload")
262- .to_vec();
263+ let message_payload =
264+ data.expect("data returned without a payload").to_vec();
265 self.check_body(&message_payload)?;
266 let parser = MessageParser::new();
267- let response = match parser.parse(&message_payload) {
268- Some(_) => smtp_chunk_ok!(250, 0, 0, 0, "OK"),
269- None => smtp_chunk_err!(
270- 500,
271- 0,
272- 0,
273- 0,
274- "Cannot parse message payload".to_string()
275- ),
276- }?;
277- self.data_transfer = None;
278- self.body = Some(message_payload.clone());
279- Ok(response)
280+ match parser.parse(&message_payload) {
281+ Some(msg) => {
282+ self.body = Some(msg.into_owned());
283+ self.data_transfer = None;
284+ smtp_chunk_ok!(250, 0, 0, 0, "OK")
285+ }
286+ None => {
287+ self.data_transfer = None;
288+ smtp_chunk_err!(
289+ 500,
290+ 0,
291+ 0,
292+ 0,
293+ "Cannot parse message payload".to_string()
294+ )
295+ }
296+ }
297 }
298 }
299 } else {
300 @@ -377,25 +379,27 @@ impl Session {
301 panic!("Transfer mode changed from BDAT to DATA")
302 }
303 DataTransfer::Data => {
304- let message_payload = data
305- .as_ref()
306- .expect("data returned without a payload")
307- .to_vec();
308+ let message_payload =
309+ data.expect("data returned without a payload").to_vec();
310 self.check_body(&message_payload)?;
311 let parser = MessageParser::new();
312- let response = match parser.parse(&message_payload) {
313- Some(_) => smtp_chunk_ok!(250, 0, 0, 0, "OK".to_string()),
314- None => smtp_chunk_err!(
315- 500,
316- 0,
317- 0,
318- 0,
319- "Cannot parse message payload".to_string()
320- ),
321- }?;
322- self.data_transfer = None;
323- self.body = Some(message_payload.clone());
324- Ok(response)
325+ match parser.parse(&message_payload) {
326+ Some(msg) => {
327+ self.body = Some(msg.into_owned());
328+ self.data_transfer = None;
329+ smtp_chunk_ok!(250, 0, 0, 0, "OK")
330+ }
331+ None => {
332+ self.data_transfer = None;
333+ smtp_chunk_err!(
334+ 500,
335+ 0,
336+ 0,
337+ 0,
338+ "Cannot parse message payload".to_string()
339+ )
340+ }
341+ }
342 }
343 }
344 } else {
345 @@ -422,8 +426,6 @@ impl Session {
346
347 #[cfg(test)]
348 mod test {
349- use std::sync::Arc;
350-
351 use futures::stream::{self, StreamExt};
352 use smtp_proto::{MailFrom, RcptTo};
353 use tokio::sync::Mutex;
354 @@ -611,7 +613,6 @@ mod test {
355 Options::default()
356 .verification(crate::verify::Func(|addr: &EmailAddress| {
357 let addr = addr.clone();
358-
359 async move {
360 assert!(addr.email() == "bar@baz.com");
361 Ok(())
362 @@ -781,9 +782,7 @@ transport rather than the session.
363 .first()
364 .is_some_and(|rcpt_to| rcpt_to.email() == "bar@example.org")));
365 assert!(session.body.as_ref().is_some_and(|body| {
366- let message = MessageParser::new().parse(body).unwrap();
367- message
368- .subject()
369+ body.subject()
370 .is_some_and(|subject| subject == "Hello World")
371 }));
372 }