Commit
+209 -87 +/-8 browse
1 | diff --git a/.gitignore b/.gitignore |
2 | index eb5a316..25c055c 100644 |
3 | --- a/.gitignore |
4 | +++ b/.gitignore |
5 | @@ -1 +1,2 @@ |
6 | target |
7 | |
8 | diff --git a/Cargo.lock b/Cargo.lock |
9 | index 889e49c..14f2435 100644 |
10 | --- a/Cargo.lock |
11 | +++ b/Cargo.lock |
12 | @@ -226,6 +226,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" |
13 | checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" |
14 | |
15 | [[package]] |
16 | + name = "charset" |
17 | + version = "0.1.5" |
18 | + source = "registry+https://github.com/rust-lang/crates.io-index" |
19 | + checksum = "f1f927b07c74ba84c7e5fe4db2baeb3e996ab2688992e39ac68ce3220a677c7e" |
20 | + dependencies = [ |
21 | + "base64 0.22.1", |
22 | + "encoding_rs", |
23 | + ] |
24 | + |
25 | + [[package]] |
26 | name = "cipher" |
27 | version = "0.4.4" |
28 | source = "registry+https://github.com/rust-lang/crates.io-index" |
29 | @@ -565,6 +575,16 @@ dependencies = [ |
30 | |
31 | [[package]] |
32 | name = "gethostname" |
33 | + version = "0.2.3" |
34 | + source = "registry+https://github.com/rust-lang/crates.io-index" |
35 | + checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e" |
36 | + dependencies = [ |
37 | + "libc", |
38 | + "winapi", |
39 | + ] |
40 | + |
41 | + [[package]] |
42 | + name = "gethostname" |
43 | version = "0.4.3" |
44 | source = "registry+https://github.com/rust-lang/crates.io-index" |
45 | checksum = "0176e0459c2e4a1fe232f984bca6890e681076abb9934f6cea7c326f3fc47818" |
46 | @@ -862,7 +882,7 @@ version = "0.3.2" |
47 | source = "registry+https://github.com/rust-lang/crates.io-index" |
48 | checksum = "25f5871d5270ed80f2ee750b95600c8d69b05f8653ad3be913b2ad2e924fefcb" |
49 | dependencies = [ |
50 | - "gethostname", |
51 | + "gethostname 0.4.3", |
52 | ] |
53 | |
54 | [[package]] |
55 | @@ -876,6 +896,27 @@ dependencies = [ |
56 | ] |
57 | |
58 | [[package]] |
59 | + name = "maildir" |
60 | + version = "0.6.4" |
61 | + source = "registry+https://github.com/rust-lang/crates.io-index" |
62 | + checksum = "879a6ae6743ab8219fdee64a569094485bfe18434e82b78b27fac5cce09e1437" |
63 | + dependencies = [ |
64 | + "gethostname 0.2.3", |
65 | + "mailparse", |
66 | + ] |
67 | + |
68 | + [[package]] |
69 | + name = "mailparse" |
70 | + version = "0.14.1" |
71 | + source = "registry+https://github.com/rust-lang/crates.io-index" |
72 | + checksum = "2d096594926cab442e054e047eb8c1402f7d5b2272573b97ba68aa40629f9757" |
73 | + dependencies = [ |
74 | + "charset", |
75 | + "data-encoding", |
76 | + "quoted_printable", |
77 | + ] |
78 | + |
79 | + [[package]] |
80 | name = "maitred" |
81 | version = "0.1.0" |
82 | dependencies = [ |
83 | @@ -888,6 +929,7 @@ dependencies = [ |
84 | "mail-auth", |
85 | "mail-builder", |
86 | "mail-parser", |
87 | + "maildir", |
88 | "md5", |
89 | "smtp-proto", |
90 | "stringprep", |
91 | @@ -1103,6 +1145,12 @@ dependencies = [ |
92 | ] |
93 | |
94 | [[package]] |
95 | + name = "quoted_printable" |
96 | + version = "0.5.1" |
97 | + source = "registry+https://github.com/rust-lang/crates.io-index" |
98 | + checksum = "640c9bd8497b02465aeef5375144c26062e0dcd5939dfcbb0f5db76cb8c17c73" |
99 | + |
100 | + [[package]] |
101 | name = "rand" |
102 | version = "0.8.5" |
103 | source = "registry+https://github.com/rust-lang/crates.io-index" |
104 | diff --git a/cmd/maitred-debug/src/main.rs b/cmd/maitred-debug/src/main.rs |
105 | index 2057c8d..5a2d19b 100644 |
106 | --- a/cmd/maitred-debug/src/main.rs |
107 | +++ b/cmd/maitred-debug/src/main.rs |
108 | @@ -1,15 +1,21 @@ |
109 | + use std::{ |
110 | + path::{Path, PathBuf}, |
111 | + sync::Arc, |
112 | + }; |
113 | + |
114 | use clap::Parser; |
115 | + use tokio::sync::{Mutex, RwLock}; |
116 | use tracing::Level; |
117 | |
118 | use maitred::{ |
119 | - mail_parser::Message, DeliveryError, DeliveryFunc, Error, MilterFunc, PlainAuthFunc, Server, |
120 | - SessionOptions, |
121 | + mail_parser::Message, Delivery, DeliveryError, DeliveryFunc, Envelope, Error, Maildir, |
122 | + MilterFunc, PlainAuthFunc, Server, SessionOptions, |
123 | }; |
124 | |
125 | - async fn print_message(message: Message<'static>) -> Result<(), DeliveryError> { |
126 | + async fn print_message(envelope: &Envelope) -> Result<(), DeliveryError> { |
127 | println!( |
128 | "New SMTP Message:\n{}", |
129 | - String::from_utf8_lossy(message.raw_message()) |
130 | + String::from_utf8_lossy(envelope.body.raw_message()) |
131 | ); |
132 | Ok(()) |
133 | } |
134 | @@ -17,12 +23,18 @@ async fn print_message(message: Message<'static>) -> Result<(), DeliveryError> { |
135 | #[derive(Parser, Debug)] |
136 | #[clap(author, version, about, long_about = None)] |
137 | struct Args { |
138 | - /// Enable DKIM Verification of Incoming E-Mail. |
139 | - #[clap(long, default_value_t = true)] |
140 | + /// Enable DKIM verification of incoming e-mail. |
141 | + #[clap(long, default_value_t = false)] |
142 | dkim: bool, |
143 | - /// Enagle SPF Verification of Incoming E-Mail. |
144 | - #[clap(long, default_value_t = true)] |
145 | + /// Enable SPF verification of incoming e-mail. |
146 | + #[clap(long, default_value_t = false)] |
147 | spf: bool, |
148 | + /// Addresses from which to accept e-mail for |
149 | + #[clap(long)] |
150 | + addresses: Vec<String>, |
151 | + /// Path to a directory to store e-mails in the maildir format |
152 | + #[clap(long, default_value_t = String::from("mail"))] |
153 | + maildir: String, |
154 | } |
155 | |
156 | #[tokio::main] |
157 | @@ -34,6 +46,8 @@ async fn main() -> Result<(), Error> { |
158 | .with_line_number(true) |
159 | .with_max_level(Level::DEBUG) |
160 | .init(); |
161 | + let maildir_path = PathBuf::from(&args.maildir); |
162 | + let addresses = args.addresses.clone(); |
163 | // Set the subscriber as the default subscriber |
164 | let mut mail_server = Server::default() |
165 | .address("127.0.0.1:2525") |
166 | @@ -41,9 +55,15 @@ async fn main() -> Result<(), Error> { |
167 | let message = message.clone(); |
168 | async move { Ok(message.to_owned()) } |
169 | })) |
170 | - .with_delivery(DeliveryFunc(|message: &Message<'static>| { |
171 | - let message = message.clone(); |
172 | - async move { print_message(message.to_owned()).await } |
173 | + .with_delivery(DeliveryFunc(move |envelope: &Envelope| { |
174 | + let maildir_path = maildir_path.clone(); |
175 | + let addresses = addresses.clone(); |
176 | + let cloned = envelope.clone(); |
177 | + async move { |
178 | + print_message(&cloned).await?; |
179 | + let maildir = Maildir::new(maildir_path.as_path(), addresses.as_slice())?; |
180 | + maildir.deliver(&cloned).await |
181 | + } |
182 | })) |
183 | .dkim_verification(args.dkim) |
184 | .spf_verification(args.spf) |
185 | diff --git a/maitred/Cargo.toml b/maitred/Cargo.toml |
186 | index 514da87..f92a9b7 100644 |
187 | --- a/maitred/Cargo.toml |
188 | +++ b/maitred/Cargo.toml |
189 | @@ -13,6 +13,7 @@ futures = "0.3.30" |
190 | mail-auth = "0.5.0" |
191 | mail-builder = "0.3.2" |
192 | mail-parser = { version = "0.9.3", features = ["serde", "serde_support"] } |
193 | + maildir = "0.6.4" |
194 | md5 = "0.7.0" |
195 | smtp-proto = { version = "0.1.5", features = ["serde", "serde_support"] } |
196 | stringprep = "0.1.5" |
197 | diff --git a/maitred/src/delivery.rs b/maitred/src/delivery.rs |
198 | index e8ecb6d..bf0d4cd 100644 |
199 | --- a/maitred/src/delivery.rs |
200 | +++ b/maitred/src/delivery.rs |
201 | @@ -1,28 +1,38 @@ |
202 | - use std::future::Future; |
203 | + use std::{ |
204 | + collections::BTreeMap, |
205 | + future::Future, |
206 | + io::Error as IoError, |
207 | + path::{Path, PathBuf}, |
208 | + }; |
209 | |
210 | use async_trait::async_trait; |
211 | use mail_parser::Message; |
212 | + use maildir::Maildir as MaildirInner; |
213 | + |
214 | + use crate::Envelope; |
215 | |
216 | #[derive(Debug, thiserror::Error)] |
217 | pub enum DeliveryError { |
218 | /// Indicates an unspecified error that occurred during milting. |
219 | #[error("Internal Server Error: {0}")] |
220 | Server(String), |
221 | + #[error("IO Error: {0}")] |
222 | + Io(#[from] IoError), |
223 | } |
224 | |
225 | /// Delivery is the final stage of accepting an e-mail and may be invoked |
226 | /// multiple times depending on the server configuration. |
227 | #[async_trait] |
228 | pub trait Delivery: Sync + Send { |
229 | - async fn deliver(&self, message: &Message<'static>) -> Result<(), DeliveryError>; |
230 | + /// Persist an e-mail message in some way |
231 | + async fn deliver(&self, message: &Envelope) -> Result<(), DeliveryError>; |
232 | } |
233 | |
234 | /// DeliveryFunc wraps an async closure implementing the Delivery trait. |
235 | /// ```rust |
236 | - /// use maitred::DeliveryFunc; |
237 | - /// use maitred::mail_parser::Message; |
238 | + /// use maitred::{DeliveryFunc, Envelope}; |
239 | /// |
240 | - /// let delivery = DeliveryFunc(|message: &Message<'static>| { |
241 | + /// let delivery = DeliveryFunc(|message: &Envelope| { |
242 | /// async move { |
243 | /// Ok(()) |
244 | /// } |
245 | @@ -30,17 +40,59 @@ pub trait Delivery: Sync + Send { |
246 | /// ``` |
247 | pub struct DeliveryFunc<F, T>(pub F) |
248 | where |
249 | - F: Fn(&Message<'static>) -> T + Sync + Send, |
250 | + F: Fn(&Envelope) -> T + Sync + Send, |
251 | T: Future<Output = Result<(), DeliveryError>> + Send; |
252 | |
253 | #[async_trait] |
254 | impl<F, T> Delivery for DeliveryFunc<F, T> |
255 | where |
256 | - F: Fn(&Message<'static>) -> T + Sync + Send, |
257 | + F: Fn(&Envelope) -> T + Sync + Send, |
258 | T: Future<Output = Result<(), DeliveryError>> + Send, |
259 | { |
260 | - async fn deliver(&self, message: &Message<'static>) -> Result<(), DeliveryError> { |
261 | + async fn deliver(&self, message: &Envelope) -> Result<(), DeliveryError> { |
262 | let f = (self.0)(message); |
263 | f.await |
264 | } |
265 | } |
266 | + |
267 | + /// Maildir stores incoming e-mail on the file system in the Maildir format. |
268 | + pub struct Maildir { |
269 | + maildirs: BTreeMap<String, MaildirInner>, |
270 | + } |
271 | + |
272 | + impl Maildir { |
273 | + /// Initialize a new Maildir on the file system. |
274 | + pub fn new(path: &Path, addresses: &[String]) -> Result<Self, std::io::Error> { |
275 | + let maildirs: Result<Vec<(String, MaildirInner)>, std::io::Error> = addresses |
276 | + .iter() |
277 | + .map(|address| { |
278 | + let mbox_dir = path.join(address); |
279 | + let maildir: MaildirInner = mbox_dir.into(); |
280 | + maildir.create_dirs()?; |
281 | + Ok((address.to_string(), maildir)) |
282 | + }) |
283 | + .collect(); |
284 | + let maildirs = maildirs?; |
285 | + Ok(Maildir { |
286 | + maildirs: maildirs.into_iter().collect(), |
287 | + }) |
288 | + } |
289 | + } |
290 | + |
291 | + #[async_trait] |
292 | + impl Delivery for Maildir { |
293 | + async fn deliver(&self, message: &Envelope) -> Result<(), DeliveryError> { |
294 | + for rcpt in message.rcpt_to.iter() { |
295 | + if let Some(maildir) = self.maildirs.get(&rcpt.email()) { |
296 | + maildir |
297 | + .store_new(message.body.raw_message()) |
298 | + .map_err(|e| match e { |
299 | + maildir::MaildirError::Io(io_err) => DeliveryError::Io(io_err), |
300 | + maildir::MaildirError::Utf8(_) => unreachable!(), |
301 | + maildir::MaildirError::Time(e) => DeliveryError::Server(e.to_string()), |
302 | + })?; |
303 | + } |
304 | + } |
305 | + Ok(()) |
306 | + } |
307 | + } |
308 | diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs |
309 | index 736e17d..7961e09 100644 |
310 | --- a/maitred/src/lib.rs |
311 | +++ b/maitred/src/lib.rs |
312 | @@ -3,18 +3,18 @@ |
313 | //! # Example SMTP Server |
314 | //! ```rust |
315 | //! use maitred::{ |
316 | - //! mail_parser::Message, DeliveryError, DeliveryFunc, MilterFunc, Error, |
317 | - //! Milter, Server, SessionOptions, |
318 | + //! mail_parser::Message, Envelope, DeliveryError, DeliveryFunc, MilterFunc, Error, |
319 | + //! Milter, Server, SessionOptions, |
320 | //! }; |
321 | //! use tracing::Level; |
322 | //! |
323 | - //! async fn print_message(message: Message<'static>) -> Result<(), DeliveryError> { |
324 | + //! async fn print_message(envelope: &Envelope) -> Result<(), DeliveryError> { |
325 | //! println!("New SMTP Message:"); |
326 | - //! println!("{:?}", message.headers()); |
327 | - //! println!("Subject: {:?}", message.subject()); |
328 | + //! println!("{:?}", envelope.body.headers()); |
329 | + //! println!("Subject: {:?}", envelope.body.subject()); |
330 | //! println!( |
331 | //! "{}", |
332 | - //! message |
333 | + //! envelope.body |
334 | //! .body_text(0) |
335 | //! .map(|text| String::from_utf8_lossy(text.as_bytes()).to_string()) |
336 | //! .unwrap_or_default() |
337 | @@ -37,9 +37,9 @@ |
338 | //! let message = message.clone(); |
339 | //! async move { Ok(message.to_owned()) } |
340 | //! })) |
341 | - //! .with_delivery(DeliveryFunc(|message: &Message<'static>| { |
342 | - //! let message = message.clone(); |
343 | - //! async move { print_message(message.to_owned()).await } |
344 | + //! .with_delivery(DeliveryFunc(|envelope: &Envelope| { |
345 | + //! let envelope = envelope.clone(); |
346 | + //! async move { print_message(&envelope).await } |
347 | //! })) |
348 | //! .with_session_opts(SessionOptions::default()); |
349 | //! // mail_server.listen().await?; |
350 | @@ -69,13 +69,13 @@ use smtp_proto::Response as SmtpResponse; |
351 | use transport::Response; |
352 | |
353 | pub use auth::{AuthError, PlainAuth, PlainAuthFunc}; |
354 | - pub use delivery::{Delivery, DeliveryError, DeliveryFunc}; |
355 | + pub use delivery::{Delivery, DeliveryError, DeliveryFunc, Maildir}; |
356 | pub use expand::{Expansion, ExpansionError, ExpansionFunc}; |
357 | pub use milter::{Milter, MilterError, MilterFunc}; |
358 | pub use verify::{Verify, VerifyError, VerifyFunc}; |
359 | |
360 | pub use error::Error; |
361 | - pub use server::Server; |
362 | + pub use server::{Envelope, Server}; |
363 | pub use session::{ |
364 | SessionOptions, DEFAULT_CAPABILITIES, DEFAULT_GREETING, DEFAULT_HELP_BANNER, |
365 | DEFAULT_MAXIMUM_MESSAGE_SIZE, |
366 | diff --git a/maitred/src/server.rs b/maitred/src/server.rs |
367 | index 78aa2ea..52f7274 100644 |
368 | --- a/maitred/src/server.rs |
369 | +++ b/maitred/src/server.rs |
370 | @@ -5,9 +5,11 @@ use std::time::Duration; |
371 | use crossbeam_deque::Injector; |
372 | use crossbeam_deque::Stealer; |
373 | use crossbeam_deque::Worker as WorkQueue; |
374 | + use email_address::EmailAddress; |
375 | use futures::SinkExt; |
376 | use futures::StreamExt; |
377 | use mail_auth::Resolver; |
378 | + use mail_parser::Message; |
379 | use smtp_proto::Request; |
380 | use tokio::net::TcpListener; |
381 | use tokio::sync::mpsc::Sender; |
382 | @@ -16,6 +18,7 @@ use tokio::task::JoinHandle; |
383 | use tokio::time::timeout; |
384 | use tokio_stream::{self as stream}; |
385 | use tokio_util::codec::Framed; |
386 | + use url::Host; |
387 | |
388 | use crate::delivery::Delivery; |
389 | use crate::error::Error; |
390 | @@ -23,7 +26,7 @@ use crate::milter::Milter; |
391 | use crate::session::{Session, SessionOptions}; |
392 | use crate::smtp_response; |
393 | use crate::transport::{Command, Transport, TransportError}; |
394 | - use crate::worker::{Packet, Worker}; |
395 | + use crate::worker::Worker; |
396 | use crate::{Response, SmtpResponse}; |
397 | |
398 | /// The default port the server will listen on if none was specified in it's |
399 | @@ -53,6 +56,26 @@ pub(crate) enum ClientError { |
400 | Timeout(u64), |
401 | } |
402 | |
403 | + /// Session details to be passed internally for processing |
404 | + #[derive(Clone, Debug)] |
405 | + pub struct Envelope { |
406 | + pub body: Message<'static>, |
407 | + pub mail_from: EmailAddress, |
408 | + pub rcpt_to: Vec<EmailAddress>, |
409 | + pub hostname: Host, |
410 | + } |
411 | + |
412 | + impl From<&Session> for Envelope { |
413 | + fn from(value: &Session) -> Self { |
414 | + Envelope { |
415 | + body: value.body.clone().unwrap(), |
416 | + mail_from: value.mail_from.clone().unwrap(), |
417 | + rcpt_to: value.rcpt_to.clone().unwrap(), |
418 | + hostname: value.hostname.clone().unwrap(), |
419 | + } |
420 | + } |
421 | + } |
422 | + |
423 | /// Server implements everything that is required to run an SMTP server by |
424 | /// binding to the configured address and processing individual TCP connections |
425 | /// as they are received. |
426 | @@ -144,7 +167,7 @@ impl Server { |
427 | async fn process<T>( |
428 | &self, |
429 | mut framed: Framed<T, Transport>, |
430 | - msg_queue: Arc<Injector<Packet>>, |
431 | + msg_queue: Arc<Injector<Envelope>>, |
432 | ) -> Result<(), ClientError> |
433 | where |
434 | T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin, |
435 | @@ -195,7 +218,7 @@ impl Server { |
436 | for response in responses { |
437 | framed.send(response).await?; |
438 | } |
439 | - msg_queue.push(Packet::from(&session)); |
440 | + msg_queue.push(Envelope::from(&session)); |
441 | } |
442 | Err(response) => { |
443 | tracing::warn!("Error handling message payload: {:?}", response); |
444 | @@ -267,11 +290,11 @@ impl Server { |
445 | Ok(()) |
446 | } |
447 | |
448 | - async fn spawn_workers(&mut self, global_queue: Arc<Injector<Packet>>) { |
449 | - let local_queues: Vec<WorkQueue<Packet>> = (0..self.n_threads) |
450 | - .map(|_| WorkQueue::<Packet>::new_fifo()) |
451 | + async fn spawn_workers(&mut self, global_queue: Arc<Injector<Envelope>>) { |
452 | + let local_queues: Vec<WorkQueue<Envelope>> = (0..self.n_threads) |
453 | + .map(|_| WorkQueue::<Envelope>::new_fifo()) |
454 | .collect(); |
455 | - let stealers: Vec<Stealer<Packet>> = local_queues |
456 | + let stealers: Vec<Stealer<Envelope>> = local_queues |
457 | .iter() |
458 | .map(|local_queue| local_queue.stealer()) |
459 | .collect(); |
460 | @@ -325,7 +348,7 @@ impl Server { |
461 | } else { |
462 | None |
463 | }; |
464 | - let global_queue = Arc::new(Injector::<Packet>::new()); |
465 | + let global_queue = Arc::new(Injector::<Envelope>::new()); |
466 | self.spawn_workers(global_queue.clone()).await; |
467 | loop { |
468 | let (socket, addr) = listener.accept().await.unwrap(); |
469 | @@ -427,16 +450,14 @@ mod test { |
470 | // turn off all extended capabilities |
471 | .with_session_opts(SessionOptions::default().capabilities(0)); |
472 | let framed = Framed::new(stream, Transport::default()); |
473 | - let global_queue = Arc::new(Injector::<Packet>::new()); |
474 | + let global_queue = Arc::new(Injector::<Envelope>::new()); |
475 | server.process(framed, global_queue.clone()).await.unwrap(); |
476 | let packet = global_queue.steal().success().unwrap(); |
477 | + assert!(packet.mail_from.email() == "fuu@bar.com"); |
478 | assert!(packet |
479 | - .mail_from |
480 | - .as_ref() |
481 | - .is_some_and(|mail_from| mail_from.email() == "fuu@bar.com")); |
482 | - assert!(packet.rcpt_to.as_ref().is_some_and(|rcpts| rcpts |
483 | + .rcpt_to |
484 | .first() |
485 | - .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com"))); |
486 | + .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com")); |
487 | } |
488 | |
489 | #[tokio::test] |
490 | @@ -454,15 +475,13 @@ mod test { |
491 | }; |
492 | let server = Server::default(); |
493 | let framed = Framed::new(stream, Transport::default()); |
494 | - let global_queue = Arc::new(Injector::<Packet>::new()); |
495 | + let global_queue = Arc::new(Injector::<Envelope>::new()); |
496 | server.process(framed, global_queue.clone()).await.unwrap(); |
497 | let packet = global_queue.steal().success().unwrap(); |
498 | + assert!(packet.mail_from.email() == "fuu@bar.com"); |
499 | assert!(packet |
500 | - .mail_from |
501 | - .as_ref() |
502 | - .is_some_and(|mail_from| mail_from.email() == "fuu@bar.com")); |
503 | - assert!(packet.rcpt_to.as_ref().is_some_and(|rcpts| rcpts |
504 | + .rcpt_to |
505 | .first() |
506 | - .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com"))); |
507 | + .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com")); |
508 | } |
509 | } |
510 | diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs |
511 | index 652c881..293d5b1 100644 |
512 | --- a/maitred/src/worker.rs |
513 | +++ b/maitred/src/worker.rs |
514 | @@ -2,41 +2,17 @@ use std::sync::Arc; |
515 | use std::{iter, time::Duration}; |
516 | |
517 | use crossbeam_deque::{Injector, Stealer, Worker as WorkQueue}; |
518 | - use email_address::EmailAddress; |
519 | use mail_auth::Resolver; |
520 | - use mail_parser::Message; |
521 | use tokio::sync::{mpsc::Receiver, Mutex}; |
522 | - use url::Host; |
523 | |
524 | use crate::delivery::Delivery; |
525 | use crate::milter::Milter; |
526 | use crate::rewrite::Rewrite; |
527 | - use crate::session::Session; |
528 | use crate::validation::Validation; |
529 | - use crate::Error; |
530 | + use crate::{Envelope, Error}; |
531 | |
532 | const HEADER_DKIM_RESULT: &str = "Maitred-Dkim-Result"; |
533 | |
534 | - /// Session details to be passed internally for processing |
535 | - #[derive(Clone, Debug)] |
536 | - pub(crate) struct Packet { |
537 | - pub body: Option<Message<'static>>, |
538 | - pub mail_from: Option<EmailAddress>, |
539 | - pub rcpt_to: Option<Vec<EmailAddress>>, |
540 | - pub hostname: Option<Host>, |
541 | - } |
542 | - |
543 | - impl From<&Session> for Packet { |
544 | - fn from(value: &Session) -> Self { |
545 | - Packet { |
546 | - body: value.body.clone(), |
547 | - mail_from: value.mail_from.clone(), |
548 | - rcpt_to: value.rcpt_to.clone(), |
549 | - hostname: value.hostname.clone(), |
550 | - } |
551 | - } |
552 | - } |
553 | - |
554 | /// Worker is responsible for all asynchronous message processing after a |
555 | /// session has been completed. It will handle the following operations: |
556 | /// |
557 | @@ -47,16 +23,16 @@ impl From<&Session> for Packet { |
558 | pub(crate) struct Worker { |
559 | pub milter: Option<Arc<dyn Milter>>, |
560 | pub delivery: Option<Arc<dyn Delivery>>, |
561 | - pub global_queue: Arc<Injector<Packet>>, |
562 | - pub stealers: Vec<Stealer<Packet>>, |
563 | - pub local_queue: Arc<Mutex<WorkQueue<Packet>>>, |
564 | + pub global_queue: Arc<Injector<Envelope>>, |
565 | + pub stealers: Vec<Stealer<Envelope>>, |
566 | + pub local_queue: Arc<Mutex<WorkQueue<Envelope>>>, |
567 | pub shutdown_rx: Receiver<bool>, |
568 | pub resolver: Option<Arc<Mutex<Resolver>>>, |
569 | pub dkim_verification: bool, |
570 | } |
571 | |
572 | impl Worker { |
573 | - async fn next_packet(&self) -> Option<Packet> { |
574 | + async fn next_packet(&self) -> Option<Envelope> { |
575 | let local_queue = self.local_queue.lock().await; |
576 | local_queue.pop().or_else(|| { |
577 | iter::repeat_with(|| { |
578 | @@ -80,8 +56,8 @@ impl Worker { |
579 | break Ok(()); |
580 | } |
581 | |
582 | - if let Some(packet) = self.next_packet().await { |
583 | - let mut message = packet.body.unwrap(); |
584 | + if let Some(envelope) = self.next_packet().await { |
585 | + let mut message = envelope.body; |
586 | let message_bytes = message.raw_message(); |
587 | let message_id = message |
588 | .message_id() |
589 | @@ -92,10 +68,7 @@ impl Worker { |
590 | tracing::info!("DKIM Verification for {}", message_id); |
591 | let resolver = self.resolver.as_ref().expect("Resolver not configured"); |
592 | let resolver = resolver.lock().await; |
593 | - let passed = |
594 | - Validation(resolver) |
595 | - .verify_dkim(message_bytes) |
596 | - .await; |
597 | + let passed = Validation(resolver).verify_dkim(message_bytes).await; |
598 | dkim_passed = Some(passed); |
599 | } |
600 | if let Some(milter) = &self.milter { |
601 | @@ -121,7 +94,15 @@ impl Worker { |
602 | |
603 | if let Some(delivery) = &self.delivery { |
604 | tracing::info!("Delivering message {}", message_id); |
605 | - match delivery.deliver(&to_deliver.into_owned()).await { |
606 | + match delivery |
607 | + .deliver(&Envelope { |
608 | + body: to_deliver.into_owned(), |
609 | + mail_from: envelope.mail_from.clone(), |
610 | + rcpt_to: envelope.rcpt_to.clone(), |
611 | + hostname: envelope.hostname.clone(), |
612 | + }) |
613 | + .await |
614 | + { |
615 | Ok(_) => tracing::info!("Message successfully delivered"), |
616 | // TODO: Implement retry here |
617 | Err(err) => tracing::warn!("Message could not be delievered: {:?}", err), |