Author: Kevin Schoon [me@kevinschoon.com]
Hash: fb6280d069c25cb4a6be4910fd9cee30c4193483
Timestamp: Sun, 06 Oct 2024 12:02:54 +0000 (1 week ago)

+1273 -998 +/-16 browse
re-write session to work like a state machine, lots of other refactoring
re-write session to work like a state machine, lots of other refactoring

Session now works like a state machine and the server functionality is
included as a feature. Lots of other cleanup related to configuration and
usability as a library.
1diff --git a/Cargo.lock b/Cargo.lock
2index 3164742..ea2be38 100644
3--- a/Cargo.lock
4+++ b/Cargo.lock
5 @@ -122,9 +122,9 @@ dependencies = [
6
7 [[package]]
8 name = "async-trait"
9- version = "0.1.81"
10+ version = "0.1.83"
11 source = "registry+https://github.com/rust-lang/crates.io-index"
12- checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
13+ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd"
14 dependencies = [
15 "proc-macro2",
16 "quote",
17 @@ -1112,8 +1112,10 @@ dependencies = [
18 name = "maitred-debug"
19 version = "0.1.0"
20 dependencies = [
21+ "async-trait",
22 "clap",
23 "futures",
24+ "maildir",
25 "maitred",
26 "serde",
27 "tokio",
28 diff --git a/cmd/maitred-debug/Cargo.toml b/cmd/maitred-debug/Cargo.toml
29index 973dda8..779b87e 100644
30--- a/cmd/maitred-debug/Cargo.toml
31+++ b/cmd/maitred-debug/Cargo.toml
32 @@ -4,10 +4,11 @@ version = "0.1.0"
33 edition = "2021"
34
35 [dependencies]
36+ async-trait = "0.1.83"
37 clap = { version = "4.5.16", features = ["derive"] }
38 futures = "0.3.30"
39-
40- maitred = {path = "../../maitred"}
41+ maildir = "0.6.4"
42+ maitred = {path = "../../maitred", features = ["full"]}
43 serde = "1.0.209"
44 tokio = { version = "1.39.2", features = ["full"] }
45 toml = "0.8.19"
46 diff --git a/cmd/maitred-debug/src/main.rs b/cmd/maitred-debug/src/main.rs
47index 6422912..b5f8044 100644
48--- a/cmd/maitred-debug/src/main.rs
49+++ b/cmd/maitred-debug/src/main.rs
50 @@ -1,25 +1,19 @@
51+ use std::collections::BTreeMap;
52 use std::fs::read_to_string;
53 use std::path::{Path, PathBuf};
54
55 use clap::Parser;
56+ use maildir::Maildir;
57 use toml::from_str;
58 use tracing::Level;
59
60 mod config;
61
62- use maitred::auth::PlainAuthFunc;
63- use maitred::delivery::{Delivery, DeliveryError, DeliveryFunc, Maildir};
64+ use maitred::delivery::{Delivery, DeliveryError, DeliveryFunc};
65 use maitred::mail_parser::Message;
66 use maitred::milter::MilterFunc;
67- use maitred::{Envelope, Server, SessionOptions};
68-
69- async fn print_message(envelope: &Envelope) -> Result<(), DeliveryError> {
70- println!(
71- "New SMTP Message:\n{}",
72- String::from_utf8_lossy(envelope.body.raw_message())
73- );
74- Ok(())
75- }
76+ use maitred::server::Server;
77+ use maitred::session::Envelope;
78
79 const LONG_ABOUT: &str = r#"
80 Maitred SMTP Demo Server
81 @@ -37,6 +31,55 @@ struct Args {
82 config: String,
83 }
84
85+ /// FSDelivery stores incoming e-mail on the file system in the Maildir format
86+ /// for each address it's configured to handle.
87+ pub struct FSDelivery {
88+ maildirs: BTreeMap<String, Maildir>,
89+ }
90+
91+ impl FSDelivery {
92+ /// Initialize a new Maildir on the file system.
93+ pub fn new(path: &Path, addresses: &[String]) -> Result<Self, std::io::Error> {
94+ let maildirs: Result<Vec<(String, Maildir)>, std::io::Error> = addresses
95+ .iter()
96+ .map(|address| {
97+ let mbox_dir = path.join(address);
98+ let maildir: Maildir = mbox_dir.into();
99+ maildir.create_dirs()?;
100+ Ok((address.to_string(), maildir))
101+ })
102+ .collect();
103+ let maildirs = maildirs?;
104+ Ok(FSDelivery {
105+ maildirs: maildirs.into_iter().collect(),
106+ })
107+ }
108+ }
109+
110+ #[async_trait::async_trait]
111+ impl Delivery for FSDelivery {
112+ async fn deliver(&self, message: &Envelope) -> Result<(), DeliveryError> {
113+ println!(
114+ "New SMTP Message:\n{}",
115+ String::from_utf8_lossy(message.body.raw_message())
116+ );
117+ for rcpt in message.rcpt_to.iter() {
118+ if let Some(maildir) = self.maildirs.get(&rcpt.email()) {
119+ maildir
120+ .store_new(message.body.raw_message())
121+ .map_err(|e| match e {
122+ maildir::MaildirError::Io(io_err) => DeliveryError::Io(io_err),
123+ maildir::MaildirError::Utf8(_) => unreachable!(),
124+ maildir::MaildirError::Time(e) => DeliveryError::Server(e.to_string()),
125+ })?;
126+ } else {
127+ tracing::warn!("Ignoring unknown e-mail account: {}", rcpt);
128+ }
129+ }
130+ Ok(())
131+ }
132+ }
133+
134 #[tokio::main]
135 async fn main() -> Result<(), Box<dyn std::error::Error>> {
136 let args = Args::parse();
137 @@ -48,52 +91,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
138 .with_line_number(true)
139 .with_max_level(Level::DEBUG)
140 .init();
141- let maildir_path = PathBuf::from(&config.maildir);
142 let accounts = config.accounts.clone();
143 let addresses: Vec<String> = accounts
144 .iter()
145 .map(|account| account.address.clone())
146 .collect();
147 // initialize maildirs before starting
148- let _ = Maildir::new(maildir_path.as_path(), &addresses)?;
149- // Set the subscriber as the default subscriber
150- let mut session_opts = SessionOptions::default().plain_auth(PlainAuthFunc(
151- |authcid: &str, authzid: &str, _passwd: &str| {
152- println!("AUTHCID: {}, AUTHZID: {}", authcid, authzid);
153- async move { Ok(()) }
154- },
155- ));
156+ let delivery = FSDelivery::new(Path::new(&config.maildir), &addresses)?;
157 let mut mail_server = Server::default()
158 .address(&config.address)
159 .with_milter(MilterFunc(|message: &Message<'static>| {
160 let message = message.clone();
161 async move { Ok(message.to_owned()) }
162 }))
163- .with_delivery(DeliveryFunc(move |envelope: &Envelope| {
164- let maildir_path = maildir_path.clone();
165- let addresses = addresses.clone();
166- let cloned = envelope.clone();
167- async move {
168- print_message(&cloned).await?;
169- let maildir = Maildir::new(maildir_path.as_path(), &addresses)?;
170- maildir.deliver(&cloned).await?;
171- Ok(())
172- }
173- }))
174+ .with_delivery(delivery)
175 .dkim_verification(config.dkim.enabled)
176 .spf_verification(config.spf.enabled);
177
178 if let Some(tls_config) = config.tls {
179 tracing::info!("TLS enabled");
180 mail_server = mail_server.with_certificates(&tls_config.key, &tls_config.certificate);
181- session_opts = session_opts.starttls_enabled(true);
182+ // session_opts = session_opts.starttls_enabled(true);
183 }
184
185 if config.proxy_protocol.is_some_and(|enabled| enabled) {
186 mail_server = mail_server.proxy_protocol(true);
187 };
188
189- mail_server = mail_server.with_session_opts(session_opts);
190+ // mail_server = mail_server.with_session_opts(session_opts);
191 mail_server.listen().await?;
192 Ok(())
193 }
194 diff --git a/maitred.toml b/maitred.toml
195index d09b5f6..785fa42 100644
196--- a/maitred.toml
197+++ b/maitred.toml
198 @@ -17,9 +17,12 @@ key = "key.pem"
199 [spf]
200 enabled = false
201
202- # List of user accounts and their hard coded username / passwords
203+
204 [[accounts]]
205 address = "demo-1@example.org"
206
207 [[accounts]]
208 address = "demo-2@example.org"
209+
210+ [[accounts]]
211+ address = "hello@example.org"
212 diff --git a/maitred/Cargo.toml b/maitred/Cargo.toml
213index 76c4153..fb2c193 100644
214--- a/maitred/Cargo.toml
215+++ b/maitred/Cargo.toml
216 @@ -30,3 +30,8 @@ url = "2.5.2"
217
218 [dev-dependencies]
219 tracing-subscriber = "0.3.18"
220+
221+ [features]
222+ default = []
223+ full = ["server"]
224+ server = []
225 diff --git a/maitred/src/auth.rs b/maitred/src/auth.rs
226index 7a0c25d..133acff 100644
227--- a/maitred/src/auth.rs
228+++ b/maitred/src/auth.rs
229 @@ -4,7 +4,8 @@ use async_trait::async_trait;
230 use base64::{prelude::*, DecodeError};
231 use stringprep::{saslprep, Error as SaslPrepError};
232
233- use crate::{smtp_response, Response};
234+ use crate::smtp_response;
235+ use crate::session::Response;
236 use smtp_proto::Response as SmtpResponse;
237
238 #[derive(Debug, thiserror::Error)]
239 diff --git a/maitred/src/delivery.rs b/maitred/src/delivery.rs
240index 510b478..4e402f0 100644
241--- a/maitred/src/delivery.rs
242+++ b/maitred/src/delivery.rs
243 @@ -1,9 +1,8 @@
244- use std::{collections::BTreeMap, future::Future, io::Error as IoError, path::Path};
245+ use std::{future::Future, io::Error as IoError};
246
247 use async_trait::async_trait;
248- use maildir::Maildir as MaildirInner;
249
250- use crate::Envelope;
251+ use crate::session::Envelope;
252
253 #[derive(Debug, thiserror::Error)]
254 pub enum DeliveryError {
255 @@ -23,7 +22,7 @@ pub trait Delivery: Sync + Send {
256 }
257
258 /// DeliveryFunc wraps an async closure implementing the Delivery trait.
259- /// ```rust
260+ /// ```FIXME
261 /// use maitred::delivery::DeliveryFunc;
262 /// use maitred::Envelope;
263 ///
264 @@ -49,47 +48,3 @@ where
265 f.await
266 }
267 }
268-
269- /// Maildir stores incoming e-mail on the file system in the Maildir format.
270- pub struct Maildir {
271- maildirs: BTreeMap<String, MaildirInner>,
272- }
273-
274- impl Maildir {
275- /// Initialize a new Maildir on the file system.
276- pub fn new(path: &Path, addresses: &[String]) -> Result<Self, std::io::Error> {
277- let maildirs: Result<Vec<(String, MaildirInner)>, std::io::Error> = addresses
278- .iter()
279- .map(|address| {
280- let mbox_dir = path.join(address);
281- let maildir: MaildirInner = mbox_dir.into();
282- maildir.create_dirs()?;
283- Ok((address.to_string(), maildir))
284- })
285- .collect();
286- let maildirs = maildirs?;
287- Ok(Maildir {
288- maildirs: maildirs.into_iter().collect(),
289- })
290- }
291- }
292-
293- #[async_trait]
294- impl Delivery for Maildir {
295- async fn deliver(&self, message: &Envelope) -> Result<(), DeliveryError> {
296- for rcpt in message.rcpt_to.iter() {
297- if let Some(maildir) = self.maildirs.get(&rcpt.email()) {
298- maildir
299- .store_new(message.body.raw_message())
300- .map_err(|e| match e {
301- maildir::MaildirError::Io(io_err) => DeliveryError::Io(io_err),
302- maildir::MaildirError::Utf8(_) => unreachable!(),
303- maildir::MaildirError::Time(e) => DeliveryError::Server(e.to_string()),
304- })?;
305- } else {
306- tracing::warn!("Ignoring unknown e-mail account: {}", rcpt);
307- }
308- }
309- Ok(())
310- }
311- }
312 diff --git a/maitred/src/error.rs b/maitred/src/error.rs
313deleted file mode 100644
314index fcfc8c6..0000000
315--- a/maitred/src/error.rs
316+++ /dev/null
317 @@ -1,10 +0,0 @@
318- /// Any fatal error that is encountered by the server that should cause it
319- /// to shutdown and stop processing connections.
320- #[derive(Debug, thiserror::Error)]
321- pub enum Error {
322- /// An IO related error such as not being able to bind to a TCP socket
323- #[error("Io: {0}")]
324- Io(#[from] std::io::Error),
325- #[error("Proxy Protocol Error: {0}")]
326- ProxyProtocol(#[from] proxy_header::Error)
327- }
328 diff --git a/maitred/src/expand.rs b/maitred/src/expand.rs
329index 5be33e2..4bb9f9b 100644
330--- a/maitred/src/expand.rs
331+++ b/maitred/src/expand.rs
332 @@ -2,6 +2,10 @@ use std::future::Future;
333
334 use async_trait::async_trait;
335 use email_address::EmailAddress;
336+ use smtp_proto::Response as SmtpResponse;
337+
338+ use crate::session::Response;
339+ use crate::smtp_response;
340
341 /// An error encountered while expanding a mail address
342 #[derive(Debug, thiserror::Error)]
343 @@ -13,6 +17,15 @@ pub enum ExpansionError {
344 #[error("Group Not Found: {0}")]
345 NotFound(String),
346 }
347+ #[allow(clippy::from_over_into)]
348+ impl Into<Response<String>> for ExpansionError {
349+ fn into(self) -> Response<String> {
350+ match self {
351+ ExpansionError::Server(_) => smtp_response!(500, 0, 0, 0, self.to_string()),
352+ ExpansionError::NotFound(_) => smtp_response!(404, 0, 0, 0, self.to_string()),
353+ }
354+ }
355+ }
356
357 /// Expands a string representing a mailing list to an array of the associated
358 /// addresses within the list if it exists. NOTE: That this function should
359 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
360index 0510814..07d67d0 100644
361--- a/maitred/src/lib.rs
362+++ b/maitred/src/lib.rs
363 @@ -3,13 +3,13 @@
364 //! but also for general use.
365 //!
366 //! # Example SMTP Server
367- //! ```rust
368+ //! ```rust,no_run
369 //! use maitred::auth::PlainAuthFunc;
370- //! use maitred::delivery::{Delivery, DeliveryError, DeliveryFunc, Maildir};
371- //! use maitred::Error;
372+ //! use maitred::delivery::{Delivery, DeliveryError, DeliveryFunc};
373 //! use maitred::mail_parser::Message;
374 //! use maitred::milter::MilterFunc;
375- //! use maitred::{Envelope, Server, SessionOptions};
376+ //! use maitred::server::{Server, ServerError};
377+ //! use maitred::session::Envelope;
378 //!
379 //! use tracing::Level;
380 //!
381 @@ -28,7 +28,7 @@
382 //! }
383 //!
384 //! #[tokio::main]
385- //! async fn main() -> Result<(), Error> {
386+ //! async fn main() -> Result<(), ServerError> {
387 //! // Create a subscriber that logs events to the console
388 //! tracing_subscriber::fmt()
389 //! .compact()
390 @@ -38,16 +38,11 @@
391 //! // Set the subscriber as the default subscriber
392 //! let mut mail_server = Server::default()
393 //! .address("127.0.0.1:2525")
394- //! .with_milter(MilterFunc(|message: &Message<'static>| {
395- //! let message = message.clone();
396- //! async move { Ok(message.to_owned()) }
397- //! }))
398 //! .with_delivery(DeliveryFunc(|envelope: &Envelope| {
399 //! let envelope = envelope.clone();
400 //! async move { print_message(&envelope).await }
401- //! }))
402- //! .with_session_opts(SessionOptions::default());
403- //! // mail_server.listen().await?;
404+ //! }));
405+ //! mail_server.listen().await?;
406 //! Ok(())
407 //! }
408 //! ```
409 @@ -56,7 +51,8 @@ pub use email_address;
410 pub use mail_parser;
411 pub use smtp_proto;
412
413- mod error;
414+ mod opportunistic;
415+ mod rewrite;
416
417 /// SMTP Authentication
418 pub mod auth;
419 @@ -68,24 +64,19 @@ pub mod expand;
420 pub mod milter;
421 /// Callback for implementing SMPT command VRFY
422 pub mod verify;
423+ /// Low level SMTP session without network transport
424+ pub mod session;
425
426- mod rewrite;
427- mod server;
428- mod session;
429+ /// Full featured tokio based TCP server for handling SMTP sessions
430+ #[cfg(feature = "server")]
431+ pub mod server;
432+ #[cfg(feature = "server")]
433 mod transport;
434+ #[cfg(feature = "server")]
435 mod validation;
436+ #[cfg(feature = "server")]
437 mod worker;
438
439- use smtp_proto::Response as SmtpResponse;
440- use transport::Response;
441-
442- pub use error::Error;
443- pub use server::{Envelope, Server};
444- pub use session::{
445- SessionOptions, DEFAULT_CAPABILITIES, DEFAULT_GREETING, DEFAULT_HELP_BANNER,
446- DEFAULT_MAXIMUM_MESSAGE_SIZE,
447- };
448-
449 /// Generate a single smtp_response
450 macro_rules! smtp_response {
451 ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
452 diff --git a/maitred/src/opportunistic.rs b/maitred/src/opportunistic.rs
453new file mode 100644
454index 0000000..4cf40e5
455--- /dev/null
456+++ b/maitred/src/opportunistic.rs
457 @@ -0,0 +1,62 @@
458+ use std::sync::Arc;
459+
460+ use futures::SinkExt;
461+ use futures::StreamExt;
462+ use tokio::sync::Mutex;
463+ use tokio_rustls::server::TlsStream;
464+ use tokio_util::codec::Framed;
465+
466+ use crate::session::Response;
467+ use crate::transport::{Command, Transport, TransportError};
468+
469+ /// Connection that is either over plain text or TLS
470+ pub(crate) trait Opportunistic {
471+ async fn send(&self, message: Response<String>) -> Result<(), TransportError>;
472+ async fn next(&self) -> Option<Result<Command, TransportError>>;
473+ }
474+
475+ /// Framed SMTP Transport over Plain Text
476+ pub(crate) struct Plain<'a, T>
477+ where
478+ T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
479+ {
480+ pub inner: Arc<Mutex<Framed<&'a mut T, Transport>>>,
481+ }
482+
483+ impl<'a, T> Opportunistic for Plain<'a, T>
484+ where
485+ T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
486+ {
487+ async fn send(&self, message: Response<String>) -> Result<(), TransportError> {
488+ let mut inner = self.inner.lock().await;
489+ inner.send(message).await
490+ }
491+
492+ async fn next(&self) -> Option<Result<Command, TransportError>> {
493+ let mut inner = self.inner.lock().await;
494+ inner.next().await
495+ }
496+ }
497+
498+ /// Framed SMTP Transport over TLS
499+ pub(crate) struct Tls<'a, T>
500+ where
501+ T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
502+ {
503+ pub inner: Arc<Mutex<Framed<TlsStream<&'a mut T>, Transport>>>,
504+ }
505+
506+ impl<'a, T> Opportunistic for Tls<'a, T>
507+ where
508+ T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
509+ {
510+ async fn send(&self, message: Response<String>) -> Result<(), TransportError> {
511+ let mut inner = self.inner.lock().await;
512+ inner.send(message).await
513+ }
514+
515+ async fn next(&self) -> Option<Result<Command, TransportError>> {
516+ let mut inner = self.inner.lock().await;
517+ inner.next().await
518+ }
519+ }
520 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
521index b1acee3..2977285 100644
522--- a/maitred/src/server.rs
523+++ b/maitred/src/server.rs
524 @@ -8,13 +8,11 @@ use std::time::Duration;
525 use crossbeam_deque::Injector;
526 use crossbeam_deque::Stealer;
527 use crossbeam_deque::Worker as WorkQueue;
528- use email_address::EmailAddress;
529 use futures::SinkExt;
530 use futures::StreamExt;
531 use mail_auth::Resolver;
532- use mail_parser::Message;
533 use proxy_header::{ParseConfig, ProxyHeader};
534- use smtp_proto::Request;
535+ use smtp_proto::Response as SmtpResponse;
536 use tokio::net::TcpListener;
537 use tokio::sync::mpsc::Sender;
538 use tokio::sync::Mutex;
539 @@ -23,15 +21,17 @@ use tokio::time::timeout;
540 use tokio_rustls::{rustls, TlsAcceptor};
541 use tokio_stream::{self as stream};
542 use tokio_util::codec::Framed;
543- use url::Host;
544
545+ use crate::auth::PlainAuth;
546 use crate::delivery::Delivery;
547- use crate::error::Error;
548+ use crate::expand::Expansion;
549 use crate::milter::Milter;
550- use crate::session::{Session, SessionOptions};
551+ use crate::opportunistic::{Opportunistic, Plain, Tls};
552+ use crate::session::{Envelope, Response, Session};
553 use crate::transport::{Command, Transport, TransportError};
554+ use crate::validation::Validation;
555+ use crate::verify::Verify;
556 use crate::worker::Worker;
557- use crate::{Response, SmtpResponse};
558
559 /// The default port the server will listen on if none was specified in it's
560 /// configuration options.
561 @@ -41,20 +41,10 @@ pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:2525";
562 /// the connection.
563 pub const DEFAULT_GLOBAL_TIMEOUT_SECS: u64 = 300;
564
565- /// check if the final command is QUIT
566- fn is_quit(reqs: &[Request<String>]) -> bool {
567- reqs.last().is_some_and(|req| matches!(req, Request::Quit))
568- }
569-
570- fn is_starttls(reqs: &[Request<String>]) -> bool {
571- reqs.last()
572- .is_some_and(|req| matches!(req, Request::StartTls))
573- }
574-
575 /// Top level error encountered while processing a client connection, causes
576 /// a warning to be logged but is not fatal.
577 #[derive(Debug, thiserror::Error)]
578- pub(crate) enum ServerError {
579+ pub enum ServerError {
580 /// An IO related error such as not being able to bind to a TCP socket
581 #[error("Io: {0}")]
582 Io(#[from] std::io::Error),
583 @@ -65,31 +55,14 @@ pub(crate) enum ServerError {
584 Timeout(u64),
585 #[error("Failed to configure TLS: {0}")]
586 TlsConfiguration(#[from] rustls::Error),
587- }
588-
589- /// Session details to be passed internally for processing
590- #[derive(Clone, Debug)]
591- pub struct Envelope {
592- pub body: Message<'static>,
593- pub mail_from: EmailAddress,
594- pub rcpt_to: Vec<EmailAddress>,
595- pub hostname: Host,
596- }
597-
598- impl From<&Session> for Envelope {
599- fn from(value: &Session) -> Self {
600- Envelope {
601- body: value.body.clone().unwrap(),
602- mail_from: value.mail_from.clone().unwrap(),
603- rcpt_to: value.rcpt_to.clone().unwrap(),
604- hostname: value.hostname.clone().unwrap(),
605- }
606- }
607+ #[error("Proxy Protocol Error: {0}")]
608+ ProxyProtocol(#[from] proxy_header::Error),
609 }
610
611 /// Action for controlling a TCP session
612 pub(crate) enum Action {
613 Continue,
614+ Enqueue,
615 Shutdown,
616 TlsUpgrade,
617 }
618 @@ -100,16 +73,20 @@ pub(crate) enum Action {
619 pub struct Server {
620 address: String,
621 global_timeout: Duration,
622- options: Option<crate::session::SessionOptions>,
623+ pipelining: bool,
624 milter: Option<Arc<dyn Milter>>,
625 delivery: Option<Arc<dyn Delivery>>,
626 n_threads: usize,
627 shutdown_handles: Vec<Sender<bool>>,
628 dkim_verification: bool,
629 spf_verification: bool,
630+ list_expansion: Option<Arc<dyn Expansion>>,
631+ verification: Option<Arc<dyn Verify>>,
632+ plain_auth: Option<Arc<dyn PlainAuth>>,
633 resolver: Option<Arc<Mutex<Resolver>>>,
634 tls_certificates: Option<(PathBuf, PathBuf)>,
635 proxy_protocol: bool,
636+ session: Session,
637 }
638
639 impl Default for Server {
640 @@ -117,16 +94,20 @@ impl Default for Server {
641 Server {
642 address: DEFAULT_LISTEN_ADDR.to_string(),
643 global_timeout: Duration::from_secs(DEFAULT_GLOBAL_TIMEOUT_SECS),
644- options: None,
645+ pipelining: true,
646 milter: None,
647 delivery: None,
648 n_threads: std::thread::available_parallelism().unwrap().into(),
649 shutdown_handles: vec![],
650 dkim_verification: false,
651 spf_verification: false,
652+ list_expansion: None,
653+ plain_auth: None,
654+ verification: None,
655 resolver: None,
656 tls_certificates: None,
657 proxy_protocol: false,
658+ session: Session::default(),
659 }
660 }
661 }
662 @@ -146,11 +127,10 @@ impl Server {
663 self
664 }
665
666- /// Set session level options that affect the behavior of individual SMTP
667- /// sessions. Most custom behavior is implemented here but not specifying
668- /// any options will provide a limited but functional server.
669- pub fn with_session_opts(mut self, opts: SessionOptions) -> Self {
670- self.options = Some(opts);
671+ /// If piplining is supported in the transport, typically should be yes
672+ /// but the session could explicitly disable it.
673+ pub fn pipelining(mut self, enabled: bool) -> Self {
674+ self.pipelining = enabled;
675 self
676 }
677
678 @@ -172,6 +152,22 @@ impl Server {
679 self
680 }
681
682+ pub fn list_expansion<T>(mut self, expansion: T) -> Self
683+ where
684+ T: crate::expand::Expansion + 'static,
685+ {
686+ self.list_expansion = Some(Arc::new(expansion));
687+ self
688+ }
689+
690+ pub fn verification<T>(mut self, verification: T) -> Self
691+ where
692+ T: crate::verify::Verify + 'static,
693+ {
694+ self.verification = Some(Arc::new(verification));
695+ self
696+ }
697+
698 /// Perform DKIM Verification
699 pub fn dkim_verification(mut self, enabled: bool) -> Self {
700 self.dkim_verification = enabled;
701 @@ -184,6 +180,14 @@ impl Server {
702 self
703 }
704
705+ pub fn plain_auth<T>(mut self, plain_auth: T) -> Self
706+ where
707+ T: crate::auth::PlainAuth + 'static,
708+ {
709+ self.plain_auth = Some(Arc::new(plain_auth));
710+ self
711+ }
712+
713 /// TLS Certificates, implies that the server should listen for TLS
714 /// connections and maybe support STARTTLS if configured in the Session
715 /// options.
716 @@ -192,7 +196,7 @@ impl Server {
717 self
718 }
719
720- /// Enable support for HAProxy's
721+ /// Enable support for HAProxy's
722 /// [Proxy Protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt)
723 pub fn proxy_protocol(mut self, enabled: bool) -> Self {
724 self.proxy_protocol = enabled;
725 @@ -214,70 +218,135 @@ impl Server {
726 }
727
728 /// drive the session forward
729- async fn next<T>(
730+ async fn on_frame(
731 &self,
732- framed: &mut Framed<T, Transport>,
733+ conn: impl Opportunistic,
734 session: &mut Session,
735- queue: Arc<Injector<Envelope>>,
736- tls_active: bool,
737- ) -> Result<Action, ServerError>
738- where
739- T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
740- {
741- match timeout(self.global_timeout, framed.next()).await {
742- Ok(Some(Ok(Command::Requests(commands)))) => {
743- let shutdown = is_quit(commands.as_slice());
744- let starttls = is_starttls(commands.as_slice());
745- for command in commands {
746- match session.process(&command).await {
747- Ok(responses) => {
748+ ) -> Result<Action, ServerError> {
749+ match timeout(self.global_timeout, conn.next()).await {
750+ Ok(Some(Ok(Command::Requests(requests)))) => {
751+ for request in requests {
752+ let action = session.next(Some(&request));
753+ match action {
754+ crate::session::Action::Send(response) => {
755+ conn.send(response).await?;
756+ }
757+ crate::session::Action::SendMany(responses) => {
758 for response in responses {
759- framed.send(response).await?;
760+ conn.send(response).await?;
761 }
762 }
763- Err(e) => {
764- tracing::warn!("Client error: {:?}", e);
765- let fatal = e.is_fatal();
766- framed.send(e).await?;
767- if fatal {
768- return Ok(Action::Shutdown);
769- } else {
770- return Ok(Action::Continue);
771+ crate::session::Action::BDat {
772+ initial_response,
773+ cb,
774+ } => {
775+ conn.send(initial_response).await?;
776+ match conn.next().await {
777+ Some(Ok(Command::Payload(payload))) => match cb(payload) {
778+ crate::session::Action::Send(response) => {
779+ conn.send(response).await?;
780+ }
781+ _ => unreachable!(),
782+ },
783+ _ => unreachable!(),
784 }
785 }
786+ crate::session::Action::Data {
787+ initial_response,
788+ cb,
789+ } => {
790+ conn.send(initial_response).await?;
791+ match conn.next().await {
792+ Some(Ok(Command::Payload(payload))) => match cb(payload) {
793+ crate::session::Action::Send(response) => {
794+ conn.send(response).await?;
795+ return Ok(Action::Enqueue);
796+ }
797+ _ => unreachable!(),
798+ },
799+ _ => unreachable!(),
800+ }
801+ }
802+ crate::session::Action::SpfVerification {
803+ ip_addr,
804+ helo_domain,
805+ host_domain,
806+ mail_from,
807+ cb,
808+ } => {
809+ let resolver = self.resolver.as_ref().expect("resolver not configured");
810+ let resolver = resolver.lock().await;
811+ match cb(Validation(resolver)
812+ .verify_spf(ip_addr, &helo_domain, &host_domain, mail_from.as_str())
813+ .await)
814+ {
815+ crate::session::Action::Send(response) => {
816+ conn.send(response).await?;
817+ return Ok(Action::Continue);
818+ }
819+ _ => unreachable!(),
820+ }
821+ }
822+ crate::session::Action::PlainAuth {
823+ authcid,
824+ authzid,
825+ password,
826+ cb,
827+ } => {
828+ let plain_auth = self
829+ .plain_auth
830+ .as_ref()
831+ .expect("authentication not available");
832+ match cb(plain_auth.authenticate(&authcid, &authzid, &password).await) {
833+ crate::session::Action::Send(response) => {
834+ conn.send(response).await?;
835+ return Ok(Action::Continue);
836+ }
837+ _ => unreachable!(),
838+ }
839+ }
840+ crate::session::Action::Verify { address, cb } => {
841+ let verification = self
842+ .verification
843+ .as_ref()
844+ .expect("verification not available");
845+ match cb(verification.verify(&address).await) {
846+ crate::session::Action::Send(response) => {
847+ conn.send(response).await?;
848+ return Ok(Action::Continue);
849+ }
850+ _ => unreachable!(),
851+ }
852+ }
853+ crate::session::Action::Expand { address, cb } => {
854+ let expansion = self
855+ .list_expansion
856+ .as_ref()
857+ .expect("expansion not available");
858+ match cb(expansion.expand(&address).await) {
859+ crate::session::Action::Send(response) => {
860+ conn.send(response).await?;
861+ return Ok(Action::Continue);
862+ }
863+ _ => unreachable!(),
864+ }
865+ }
866+ crate::session::Action::StartTls(response) => {
867+ // Go ahead
868+ conn.send(response).await?;
869+ return Ok(Action::TlsUpgrade);
870+ }
871+ crate::session::Action::Quit(response) => {
872+ conn.send(response).await?;
873+ return Ok(Action::Shutdown);
874+ }
875 }
876 }
877- if starttls {
878- if tls_active {
879- tracing::warn!(
880- "Client attempted to upgrade to TLS but they already have TLS"
881- );
882- framed.send(crate::session::tls_already_active()).await?;
883- return Ok(Action::Continue);
884- }
885- Ok(Action::TlsUpgrade)
886- } else if shutdown {
887- Ok(Action::Shutdown)
888- } else {
889- Ok(Action::Continue)
890- }
891+ Ok(Action::Continue)
892 }
893- Ok(Some(Ok(Command::Payload(payload)))) => match session.handle_data(&payload).await {
894- Ok(responses) => {
895- for response in responses {
896- framed.send(response).await?;
897- }
898- queue.push(Envelope::from(&session.clone()));
899- Ok(Action::Continue)
900- }
901- Err(response) => {
902- tracing::warn!("Error handling message payload: {:?}", response);
903- framed.send(response).await?;
904- Ok(Action::Continue)
905- }
906- },
907+ Ok(Some(Ok(Command::Payload(_)))) => unreachable!(),
908 Ok(Some(Err(err))) => {
909- tracing::warn!("Client Error: {}", err);
910+ tracing::warn!("Transport Error: {}", err);
911 let response = match err {
912 crate::transport::TransportError::PipelineNotEnabled => {
913 crate::smtp_response!(500, 0, 0, 0, "Pipelining is not enabled")
914 @@ -288,65 +357,88 @@ impl Server {
915 // IO Errors considered fatal for the entire session
916 crate::transport::TransportError::Io(e) => return Err(ServerError::Io(e)),
917 };
918- framed.send(response).await?;
919+ conn.send(response).await?;
920 Ok(Action::Continue)
921 }
922 Ok(None) => Ok(Action::Shutdown),
923- Err(e) => {
924- tracing::warn!("Client connection exceeded: {:?}", self.global_timeout);
925- framed.send(crate::session::timeout(&e.to_string())).await?;
926+ Err(elapsed) => {
927+ tracing::warn!("Client timeout: {}", elapsed);
928+ conn.send(crate::session::timeout(&elapsed.to_string()))
929+ .await?;
930 Err(ServerError::Timeout(self.global_timeout.as_secs()))
931 }
932 }
933 }
934
935- /// Serve a plain SMTP connection that may be upgradable to TLS.
936 async fn serve_plain<T>(
937 &self,
938 stream: &mut T,
939 msg_queue: Arc<Injector<Envelope>>,
940- pipelining: bool,
941 remote_addr: SocketAddr,
942 ) -> Result<(), ServerError>
943 where
944 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
945 {
946- let mut session = Session::default()
947- .with_options(
948- self.options
949- .clone()
950- .unwrap_or_default()
951- .ip_addr(remote_addr.ip()),
952- )
953- .resolver(self.resolver.clone())
954- .spf_verification(self.spf_verification);
955-
956- let greeting = session.greeting();
957-
958- let transport = Transport::default().pipelining(pipelining);
959-
960- let mut framed = Framed::new(&mut *stream, transport.clone());
961+ let mut session = self
962+ .session
963+ .clone()
964+ .client_ip(remote_addr.ip())
965+ .starttls(self.tls_certificates.is_some())
966+ .expn_enabled(self.list_expansion.is_some());
967+
968+ let mut framed = Framed::new(
969+ &mut *stream,
970+ Transport::default().pipelining(self.pipelining),
971+ );
972+
973+ // initialize the connection with a greeting
974+ match session.next(None) {
975+ crate::session::Action::Send(response) => {
976+ framed.send(response).await?;
977+ }
978+ _ => unreachable!(),
979+ }
980
981- framed.send(greeting).await?;
982+ let framed = Arc::new(Mutex::new(framed));
983
984 loop {
985 match self
986- .next(&mut framed, &mut session, msg_queue.clone(), false)
987+ .on_frame(
988+ Plain {
989+ inner: framed.clone(),
990+ },
991+ &mut session,
992+ )
993 .await?
994 {
995 Action::Continue => {}
996+ Action::Enqueue => {
997+ msg_queue.push(session.envelope());
998+ }
999 Action::Shutdown => return Ok(()),
1000 Action::TlsUpgrade => {
1001 let acceptor = TlsAcceptor::from(Arc::new(self.rustls_config().await?));
1002 let tls_stream = acceptor.accept(&mut *stream).await?;
1003- let mut tls_framed =
1004- Framed::new(tls_stream, transport.clone());
1005+ let tls_framed =
1006+ Framed::new(tls_stream, Transport::default().pipelining(self.pipelining));
1007+ let tls_framed = Arc::new(Mutex::new(tls_framed));
1008+ // Per the RFC after TLS is established the session is
1009+ // reset.
1010+ let mut tls_session = session.clone().tls_active(true);
1011 loop {
1012 match self
1013- .next(&mut tls_framed, &mut session, msg_queue.clone(), true)
1014+ .on_frame(
1015+ Tls {
1016+ inner: tls_framed.clone(),
1017+ },
1018+ &mut tls_session,
1019+ )
1020 .await?
1021 {
1022 Action::Continue => {}
1023+ Action::Enqueue => {
1024+ msg_queue.push(session.envelope());
1025+ }
1026 Action::Shutdown => return Ok(()),
1027 Action::TlsUpgrade => unreachable!(),
1028 }
1029 @@ -406,7 +498,7 @@ impl Server {
1030 });
1031 }
1032
1033- pub async fn listen(&mut self) -> Result<(), Error> {
1034+ pub async fn listen(&mut self) -> Result<(), ServerError> {
1035 let listener = TcpListener::bind(&self.address).await?;
1036 tracing::info!("Mail server listening @ {}", self.address);
1037 self.resolver = if self.spf_verification || self.dkim_verification {
1038 @@ -440,13 +532,8 @@ impl Server {
1039 } else {
1040 addr
1041 };
1042- let pipelining = self
1043- .options
1044- .as_ref()
1045- .is_some_and(|opts| opts.capabilities & smtp_proto::EXT_PIPELINING != 0)
1046- || self.options.is_none();
1047 match self
1048- .serve_plain(&mut socket, global_queue.clone(), pipelining, addr)
1049+ .serve_plain(&mut socket, global_queue.clone(), addr)
1050 .await
1051 {
1052 Ok(_) => {
1053 @@ -463,8 +550,6 @@ impl Server {
1054 #[cfg(test)]
1055 mod test {
1056
1057- use crate::SessionOptions;
1058-
1059 use super::*;
1060
1061 use std::io;
1062 @@ -535,15 +620,13 @@ mod test {
1063 ],
1064 ..Default::default()
1065 };
1066- let server = Server::default()
1067- // turn off all extended capabilities
1068- .with_session_opts(SessionOptions::default().capabilities(0));
1069+ let server = Server::default();
1070+ // turn off all extended capabilities
1071 let global_queue = Arc::new(Injector::<Envelope>::new());
1072 server
1073 .serve_plain(
1074 &mut stream,
1075 global_queue.clone(),
1076- false,
1077 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 25)),
1078 )
1079 .await
1080 @@ -575,7 +658,6 @@ mod test {
1081 .serve_plain(
1082 &mut stream,
1083 global_queue.clone(),
1084- false,
1085 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 25)),
1086 )
1087 .await
1088 diff --git a/maitred/src/session.rs b/maitred/src/session.rs
1089index 8ec2e7c..51d1c2d 100644
1090--- a/maitred/src/session.rs
1091+++ b/maitred/src/session.rs
1092 @@ -1,24 +1,18 @@
1093+ use std::fmt::Display;
1094 use std::net::IpAddr;
1095- use std::rc::Rc;
1096- use std::result::Result as StdResult;
1097 use std::str::FromStr;
1098- use std::sync::Arc;
1099
1100 use bytes::Bytes;
1101 use email_address::EmailAddress;
1102
1103- use mail_auth::Resolver;
1104 use mail_parser::{Message, MessageParser};
1105 use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
1106- use tokio::sync::Mutex;
1107 use url::Host;
1108
1109- use crate::auth::{AuthData, PlainAuth};
1110- use crate::expand::Expansion;
1111+ use crate::auth::{AuthData, AuthError};
1112+ use crate::expand::ExpansionError;
1113 use crate::smtp_response;
1114- use crate::transport::Response;
1115- use crate::validation::Validation;
1116- use crate::verify::Verify;
1117+ use crate::verify::VerifyError;
1118
1119 /// Default help banner returned from a HELP command without any parameters
1120 pub const DEFAULT_HELP_BANNER: &str = r#"
1121 @@ -43,9 +37,168 @@ pub const DEFAULT_CAPABILITIES: u32 = smtp_proto::EXT_SIZE
1122 | smtp_proto::EXT_PIPELINING
1123 | smtp_proto::EXT_8BIT_MIME;
1124
1125+ #[derive(Debug, Clone)]
1126+ pub enum Response<T>
1127+ where
1128+ T: Display,
1129+ {
1130+ General(SmtpResponse<T>),
1131+ Ehlo(EhloResponse<T>),
1132+ }
1133+
1134+ impl Response<String> {
1135+ pub fn is_fatal(&self) -> bool {
1136+ match self {
1137+ Response::General(resp) => resp.code >= 500,
1138+ Response::Ehlo(_) => false,
1139+ }
1140+ }
1141+ }
1142+
1143+ impl<T> PartialEq for Response<T>
1144+ where
1145+ T: Display,
1146+ {
1147+ fn eq(&self, other: &Self) -> bool {
1148+ match self {
1149+ Response::General(req) => match other {
1150+ Response::General(other) => req.to_string() == other.to_string(),
1151+ Response::Ehlo(_) => false,
1152+ },
1153+ Response::Ehlo(req) => match other {
1154+ Response::General(_) => false,
1155+ Response::Ehlo(other) => {
1156+ // FIXME
1157+ req.capabilities == other.capabilities
1158+ && req.hostname.to_string() == other.hostname.to_string()
1159+ && req.deliver_by == other.deliver_by
1160+ && req.size == other.size
1161+ && req.auth_mechanisms == other.auth_mechanisms
1162+ && req.future_release_datetime.eq(&req.future_release_datetime)
1163+ && req.future_release_interval.eq(&req.future_release_interval)
1164+ }
1165+ },
1166+ }
1167+ }
1168+ }
1169+
1170+ impl<T> Eq for Response<T> where T: Display {}
1171+
1172+ /// An Envelope containing an e-mail message created from the session
1173+ #[derive(Clone, Debug)]
1174+ pub struct Envelope {
1175+ pub body: Message<'static>,
1176+ pub mail_from: EmailAddress,
1177+ pub rcpt_to: Vec<EmailAddress>,
1178+ pub hostname: Host,
1179+ }
1180+
1181+ pub enum Action<'a> {
1182+ Send(Response<String>),
1183+ SendMany(Vec<Response<String>>),
1184+ BDat {
1185+ initial_response: Response<String>,
1186+ cb: Box<dyn FnOnce(Bytes) -> Action<'a> + 'a>,
1187+ },
1188+ Data {
1189+ initial_response: Response<String>,
1190+ cb: Box<dyn FnOnce(Bytes) -> Action<'a> + 'a>,
1191+ },
1192+ SpfVerification {
1193+ ip_addr: IpAddr,
1194+ helo_domain: String,
1195+ host_domain: String,
1196+ mail_from: EmailAddress,
1197+ cb: Box<dyn FnOnce(bool) -> Action<'a> + 'a>,
1198+ },
1199+ PlainAuth {
1200+ authcid: String,
1201+ authzid: String,
1202+ password: String,
1203+ cb: Box<dyn FnOnce(Result<(), AuthError>) -> Action<'a> + 'a>,
1204+ },
1205+ Verify {
1206+ address: EmailAddress,
1207+ cb: Box<dyn FnOnce(Result<(), VerifyError>) -> Action<'a> + 'a>,
1208+ },
1209+ Expand {
1210+ address: String,
1211+ cb: Box<dyn FnOnce(Result<Vec<EmailAddress>, ExpansionError>) -> Action<'a> + 'a>,
1212+ },
1213+ StartTls(Response<String>),
1214+ Quit(Response<String>),
1215+ }
1216+
1217+ impl Display for Action<'_> {
1218+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1219+ match self {
1220+ Action::Send(response) => match response {
1221+ Response::General(response) => {
1222+ f.write_fmt(format_args!("Send:{}", &response.to_string()))
1223+ }
1224+ Response::Ehlo(ehlo_response) => {
1225+ f.write_fmt(format_args!("Send:{:?}", ehlo_response))
1226+ }
1227+ },
1228+ Action::SendMany(vec) => {
1229+ f.write_str("Send Many:\n")?;
1230+ vec.iter().for_each(|message| match message {
1231+ Response::General(response) => f
1232+ .write_fmt(format_args!("{}\n", &response.to_string()))
1233+ .unwrap(),
1234+ Response::Ehlo(_ehlo_response) => unreachable!(),
1235+ });
1236+ Ok(())
1237+ }
1238+ Action::BDat {
1239+ initial_response,
1240+ cb: _,
1241+ } => match initial_response {
1242+ Response::General(response) => f.write_fmt(format_args!("BDat:\n{}", response)),
1243+ Response::Ehlo(_ehlo_response) => unreachable!(),
1244+ },
1245+ Action::Data {
1246+ initial_response,
1247+ cb: _,
1248+ } => match initial_response {
1249+ Response::General(response) => f.write_fmt(format_args!("Data:\n{}", response)),
1250+ Response::Ehlo(_ehlo_response) => unreachable!(),
1251+ },
1252+ Action::SpfVerification {
1253+ ip_addr,
1254+ helo_domain,
1255+ host_domain,
1256+ mail_from,
1257+ cb: _,
1258+ } => f.write_fmt(format_args!(
1259+ "Spf: ip={}, domain={}, us={}, mail={}",
1260+ ip_addr, helo_domain, host_domain, mail_from
1261+ )),
1262+ Action::PlainAuth {
1263+ authcid,
1264+ authzid,
1265+ password: _,
1266+ cb: _,
1267+ } => f.write_fmt(format_args!("Plain Auth: {} {}", authcid, authzid)),
1268+ Action::Verify { address, cb: _ } => f.write_fmt(format_args!("Verify: {}", address)),
1269+ Action::Expand { address, cb: _ } => f.write_fmt(format_args!("Expand: {}", address)),
1270+ Action::StartTls(response) => match response {
1271+ Response::General(response) => f.write_str(&response.to_string()),
1272+ Response::Ehlo(_ehlo_response) => unreachable!(),
1273+ },
1274+ Action::Quit(response) => match response {
1275+ Response::General(response) => {
1276+ f.write_fmt(format_args!("Quit: {}", &response.to_string()))
1277+ }
1278+ Response::Ehlo(_ehlo_response) => unreachable!(),
1279+ },
1280+ }
1281+ }
1282+ }
1283+
1284 /// Result generated as part of an SMTP session, an Err indicates a session
1285 /// level error that will be returned to the client.
1286- pub type Result = StdResult<Vec<Response<String>>, Response<String>>;
1287+ // pub type Result = StdResult<Action, Response<String>>;
1288
1289 /// If the session was started with HELO or ELHO.
1290 #[derive(Clone)]
1291 @@ -54,13 +207,6 @@ enum Mode {
1292 Extended,
1293 }
1294
1295- /// Type of data transfer mode in use.
1296- #[derive(Clone)]
1297- enum DataTransfer {
1298- Data,
1299- Bdat,
1300- }
1301-
1302 /// Sent when the connection exceeds the maximum configured timeout
1303 pub fn timeout(message: &str) -> Response<String> {
1304 smtp_response!(421, 4, 4, 2, format!("Timeout exceeded: {}", message))
1305 @@ -125,41 +271,77 @@ fn parse_host(host: &str) -> String {
1306 }
1307 }
1308
1309- /// Session level options that configure individual SMTP transactions
1310+ /// session runtime flags
1311+ #[derive(Clone, Default)]
1312+ struct Flags {
1313+ authentication: bool,
1314+ starttls: bool,
1315+ vrfy: bool,
1316+ expn: bool,
1317+ spf: bool,
1318+ }
1319+
1320+ /// State machine that corresponds to a single SMTP session, calls to next
1321+ /// return actions that the caller is expected to implement in a transport.
1322 #[derive(Clone)]
1323- pub struct SessionOptions {
1324- pub our_hostname: String,
1325- pub maximum_size: u64,
1326- pub capabilities: u32,
1327- pub help_banner: String,
1328- pub greeting: String,
1329- pub list_expansion: Option<Arc<dyn Expansion>>,
1330- pub verification: Option<Arc<dyn Verify>>,
1331- pub plain_auth: Option<Arc<dyn PlainAuth>>,
1332- pub ip_addr: Option<IpAddr>,
1333- pub starttls_enabled: Option<bool>,
1334+ pub struct Session {
1335+ /// message body
1336+ pub body: Option<Message<'static>>,
1337+ /// mailto address
1338+ pub mail_from: Option<EmailAddress>,
1339+ /// rcpt address
1340+ pub rcpt_to: Option<Vec<EmailAddress>>,
1341+ pub hostname: Option<Host>,
1342+ initialized: Option<Mode>,
1343+ // previously ran commands
1344+ // TODO pipeline still partially broken
1345+ history: Vec<Request<String>>,
1346+
1347+ // session opts
1348+ our_hostname: Option<String>, // required
1349+ client_ip: Option<IpAddr>,
1350+ maximum_size: u64,
1351+ capabilities: u32,
1352+ help_banner: String,
1353+ greeting: String,
1354+ tls_active: bool,
1355+
1356+ spf_verified_host: Option<String>,
1357+ authenticated_id: Option<String>,
1358+ flags: Flags,
1359 }
1360
1361- impl Default for SessionOptions {
1362+ impl Default for Session {
1363 fn default() -> Self {
1364- SessionOptions {
1365- our_hostname: String::default(),
1366+ Session {
1367+ body: None,
1368+ mail_from: None,
1369+ rcpt_to: None,
1370+ hostname: None,
1371+ initialized: None,
1372+ history: Vec::new(),
1373+ our_hostname: None,
1374+ client_ip: None,
1375 maximum_size: DEFAULT_MAXIMUM_MESSAGE_SIZE,
1376 capabilities: DEFAULT_CAPABILITIES,
1377 help_banner: DEFAULT_HELP_BANNER.to_string(),
1378 greeting: DEFAULT_GREETING.to_string(),
1379- list_expansion: None,
1380- verification: None,
1381- plain_auth: None,
1382- ip_addr: None,
1383- starttls_enabled: None,
1384+ tls_active: false,
1385+ spf_verified_host: None,
1386+ authenticated_id: None,
1387+ flags: Flags::default(),
1388 }
1389 }
1390 }
1391
1392- impl SessionOptions {
1393+ impl Session {
1394 pub fn our_hostname(mut self, hostname: &str) -> Self {
1395- self.our_hostname = hostname.to_string();
1396+ self.our_hostname = Some(hostname.to_string());
1397+ self
1398+ }
1399+
1400+ pub fn spf_verification(mut self, verify_spf: bool) -> Self {
1401+ self.flags.spf = verify_spf;
1402 self
1403 }
1404
1405 @@ -178,82 +360,40 @@ impl SessionOptions {
1406 self
1407 }
1408
1409- pub fn starttls_enabled(mut self, enabled: bool) -> Self {
1410- if enabled {
1411- self.capabilities |= smtp_proto::EXT_START_TLS;
1412- }
1413- self.starttls_enabled = Some(enabled);
1414+ pub fn greeting_banner(mut self, greeting: &str) -> Self {
1415+ self.greeting = greeting.to_string();
1416 self
1417 }
1418
1419- pub fn list_expansion<T>(mut self, expansion: T) -> Self
1420- where
1421- T: crate::expand::Expansion + 'static,
1422- {
1423- self.list_expansion = Some(Arc::new(expansion));
1424- self
1425- }
1426-
1427- pub fn verification<T>(mut self, verification: T) -> Self
1428- where
1429- T: crate::verify::Verify + 'static,
1430- {
1431- self.verification = Some(Arc::new(verification));
1432+ pub fn authentication(mut self, enabled: bool) -> Self {
1433+ self.flags.authentication = enabled;
1434+ self.capabilities |= smtp_proto::EXT_AUTH;
1435 self
1436 }
1437
1438- pub fn plain_auth<T>(mut self, plain_auth: T) -> Self
1439- where
1440- T: crate::auth::PlainAuth + 'static,
1441- {
1442- self.capabilities |= smtp_proto::EXT_AUTH;
1443- self.plain_auth = Some(Arc::new(plain_auth));
1444+ pub fn client_ip(mut self, client_ip: IpAddr) -> Self {
1445+ self.client_ip = Some(client_ip);
1446 self
1447 }
1448
1449- pub fn ip_addr(mut self, ip_addr: IpAddr) -> Self {
1450- self.ip_addr = Some(ip_addr);
1451+ pub fn starttls(mut self, enabled: bool) -> Self {
1452+ self.flags.starttls = enabled;
1453+ self.capabilities |= smtp_proto::EXT_START_TLS;
1454 self
1455 }
1456- }
1457-
1458- /// Stateful connection that coresponds to a single SMTP session.
1459- #[derive(Clone, Default)]
1460- pub(crate) struct Session {
1461- /// message body
1462- pub body: Option<Message<'static>>,
1463- /// mailto address
1464- pub mail_from: Option<EmailAddress>,
1465- /// rcpt address
1466- pub rcpt_to: Option<Vec<EmailAddress>>,
1467- pub hostname: Option<Host>,
1468- // If an active data transfer is taking place
1469- data_transfer: Option<DataTransfer>,
1470- initialized: Option<Mode>,
1471- spf_verification: bool,
1472- auth_initialized: bool,
1473- // session options
1474- opts: Rc<SessionOptions>,
1475- // previously ran commands
1476- // TODO pipeline still partially broken
1477- history: Vec<Request<String>>,
1478- resolver: Option<Arc<Mutex<Resolver>>>,
1479- }
1480
1481- impl Session {
1482- pub fn spf_verification(mut self, verify_spf: bool) -> Self {
1483- self.spf_verification = verify_spf;
1484+ pub fn vrfy_enabled(mut self, enabled: bool) -> Self {
1485+ self.flags.vrfy = enabled;
1486 self
1487 }
1488
1489- pub fn resolver(mut self, resolver: Option<Arc<Mutex<Resolver>>>) -> Self {
1490- self.resolver = resolver;
1491+ pub fn expn_enabled(mut self, enabled: bool) -> Self {
1492+ self.flags.expn = enabled;
1493 self
1494 }
1495
1496- /// Configure a session with various options that effect it's behavior.
1497- pub fn with_options(mut self, opts: SessionOptions) -> Self {
1498- self.opts = Rc::new(opts);
1499+ pub fn tls_active(mut self, active: bool) -> Self {
1500+ self.tls_active = active;
1501 self
1502 }
1503
1504 @@ -265,18 +405,24 @@ impl Session {
1505 self.rcpt_to = None;
1506 // FIXME: is the hostname reset?
1507 // self.hostname = None;
1508- self.data_transfer = None;
1509 self.history = Vec::new();
1510+ self.spf_verified_host = None;
1511 }
1512+
1513 /// A greeting must be sent at the start of an SMTP connection when it is
1514 /// first initialized.
1515+ /// FIXME
1516 pub fn greeting(&self) -> Response<String> {
1517 smtp_response!(
1518 220,
1519 2,
1520 0,
1521 0,
1522- format!("{} {}", self.opts.our_hostname, self.opts.greeting)
1523+ format!(
1524+ "{} {}",
1525+ self.our_hostname.clone().expect("hostname not configured"),
1526+ self.greeting
1527+ )
1528 )
1529 }
1530
1531 @@ -285,11 +431,11 @@ impl Session {
1532 self.initialized
1533 .as_ref()
1534 .is_some_and(|mode| matches!(mode, Mode::Extended))
1535- && self.opts.capabilities & capability != 0
1536+ && self.capabilities & capability != 0
1537 }
1538
1539 /// Ensure that the session has been initialized otherwise return an error
1540- fn check_initialized(&self) -> StdResult<(), Response<String>> {
1541+ fn check_initialized(&self) -> Result<(), Response<String>> {
1542 if self.initialized.is_none() {
1543 return Err(smtp_response!(
1544 500,
1545 @@ -303,274 +449,328 @@ impl Session {
1546 }
1547
1548 /// checks if 8BITMIME is supported
1549- fn check_body(&self, body: &[u8]) -> StdResult<(), Response<String>> {
1550+ fn check_body(&self, body: &[u8]) -> Result<(), Response<String>> {
1551 if !self.has_capability(smtp_proto::EXT_8BIT_MIME) && !body.is_ascii() {
1552 return Err(smtp_response!(
1553 500,
1554 0,
1555 0,
1556 0,
1557- "Non ascii characters found in message body"
1558+ "Non ASCII characters found in message body"
1559 ));
1560 }
1561 Ok(())
1562 }
1563
1564- pub async fn handle_data(&mut self, data: &Bytes) -> Result {
1565- self.check_initialized()?;
1566- let transfer_mode = self
1567- .data_transfer
1568- .as_ref()
1569- .expect("transfer is not initalized");
1570- match transfer_mode {
1571- DataTransfer::Data => {
1572- let message_payload = data.to_vec();
1573- self.check_body(&message_payload)?;
1574- let parser = MessageParser::new();
1575- match parser.parse(&message_payload) {
1576- Some(msg) => {
1577- self.body = Some(msg.into_owned());
1578- self.data_transfer = None;
1579- Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
1580- }
1581- None => {
1582- self.data_transfer = None;
1583- Ok(vec![smtp_response!(
1584- 500,
1585- 0,
1586- 0,
1587- 0,
1588- "Cannot parse message payload".to_string()
1589- )])
1590- }
1591- }
1592- }
1593- DataTransfer::Bdat => {
1594- let message_payload = data.to_vec();
1595- self.check_body(&message_payload)?;
1596- let parser = MessageParser::new();
1597- match parser.parse(&message_payload) {
1598- Some(msg) => {
1599- self.body = Some(msg.into_owned());
1600- self.data_transfer = None;
1601- Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
1602- }
1603- None => {
1604- self.data_transfer = None;
1605- Ok(vec![smtp_response!(
1606- 500,
1607- 0,
1608- 0,
1609- 0,
1610- "Cannot parse message payload".to_string()
1611- )])
1612- }
1613- }
1614- }
1615+ pub fn envelope(&self) -> Envelope {
1616+ Envelope {
1617+ body: self.body.clone().unwrap(),
1618+ mail_from: self.mail_from.clone().unwrap(),
1619+ rcpt_to: self.rcpt_to.clone().unwrap(),
1620+ hostname: self.hostname.clone().unwrap(),
1621 }
1622 }
1623
1624- /// Statefully process the SMTP command with optional data payload, any
1625- /// error returned is passed back to the caller.
1626- /// NOTE:
1627- /// Data transfers are detected in the transport level and handled by two
1628- /// calls to process(). The first call contains the empty data request to
1629- /// indicate that the process is starting and the second one contains the
1630- /// parsed bytes from the transfer.
1631- /// FIXME: Not at all reasonable yet
1632- pub async fn process(&mut self, req: &Request<String>) -> Result {
1633- self.history.push(req.clone());
1634+ /// Process the SMTP command returning the action sometimes with a callback
1635+ /// that the implementor needs to take.
1636+ pub fn next(&mut self, req: Option<&Request<String>>) -> Action<'_> {
1637+ if let Some(req) = req {
1638+ self.history.push(req.clone());
1639+ }
1640 match req {
1641- Request::Ehlo { host } => {
1642- self.hostname = Some(
1643- Host::parse(&parse_host(host))
1644- .map_err(|e| smtp_response!(500, 0, 0, 0, e.to_string()))?,
1645- );
1646+ None => {
1647+ tracing::info!("Sending initial greeting");
1648+ Action::Send(smtp_response!(
1649+ 220,
1650+ 2,
1651+ 0,
1652+ 0,
1653+ format!(
1654+ "{} {}",
1655+ self.our_hostname.clone().unwrap_or_default(),
1656+ self.greeting
1657+ )
1658+ ))
1659+ }
1660+ Some(Request::Ehlo { host }) => {
1661+ match Host::parse(&parse_host(host)) {
1662+ Ok(hostname) => {
1663+ self.hostname = Some(hostname);
1664+ }
1665+ Err(e) => return Action::Send(smtp_response!(500, 0, 0, 0, e.to_string())),
1666+ };
1667 self.reset();
1668 self.initialized = Some(Mode::Extended);
1669 let mut resp = EhloResponse::new(format!("Hello {}", host));
1670- resp.capabilities = self.opts.capabilities;
1671- resp.size = self.opts.maximum_size as usize;
1672- if self.opts.plain_auth.is_some() {
1673+ resp.capabilities = self.capabilities;
1674+ resp.size = self.maximum_size as usize;
1675+ if self.flags.authentication {
1676 resp.auth_mechanisms = smtp_proto::AUTH_PLAIN;
1677 }
1678- Ok(vec![Response::Ehlo(resp)])
1679+ Action::Send(Response::Ehlo(resp))
1680 }
1681- Request::Lhlo { host } => {
1682- self.hostname = Some(
1683- Host::parse(&parse_host(host)).map_err(|e| smtp_response!(500, 0, 0, 0, e))?,
1684- );
1685+ Some(Request::Lhlo { host }) => {
1686+ match Host::parse(&parse_host(host)) {
1687+ Ok(hostname) => {
1688+ self.hostname = Some(hostname);
1689+ }
1690+ Err(e) => return Action::Send(smtp_response!(500, 0, 0, 0, e.to_string())),
1691+ };
1692 self.reset();
1693 self.initialized = Some(Mode::Legacy);
1694- Ok(vec![smtp_response!(
1695- 250,
1696- 0,
1697- 0,
1698- 0,
1699- format!("Hello {}", host)
1700- )])
1701+ Action::Send(smtp_response!(250, 0, 0, 0, format!("Hello {}", host)))
1702 }
1703- Request::Helo { host } => {
1704- self.hostname = Some(
1705- Host::parse(&parse_host(host))
1706- .map_err(|e| smtp_response!(500, 0, 0, 0, e.to_string()))?,
1707- );
1708+ Some(Request::Helo { host }) => {
1709+ match Host::parse(&parse_host(host)) {
1710+ Ok(hostname) => {
1711+ self.hostname = Some(hostname);
1712+ }
1713+ Err(e) => return Action::Send(smtp_response!(500, 0, 0, 0, e.to_string())),
1714+ };
1715 self.reset();
1716 self.initialized = Some(Mode::Legacy);
1717- Ok(vec![smtp_response!(
1718- 250,
1719- 0,
1720- 0,
1721- 0,
1722- format!("Hello {}", host)
1723- )])
1724+ Action::Send(smtp_response!(250, 0, 0, 0, format!("Hello {}", host)))
1725 }
1726- Request::Mail { from } => {
1727- self.check_initialized()?;
1728- let mail_from = EmailAddress::from_str(from.address.as_str()).map_err(|e| {
1729- smtp_response!(
1730- 500,
1731- 0,
1732- 0,
1733- 0,
1734- format!("cannot parse: {} {}", from.address, e)
1735- )
1736- })?;
1737+ Some(Request::Mail { from }) => {
1738+ if let Some(err) = self.check_initialized().err() {
1739+ return Action::Send(err);
1740+ }
1741+ let mail_from = match EmailAddress::from_str(&from.address) {
1742+ Ok(addr) => addr,
1743+ Err(e) => {
1744+ return Action::Send(smtp_response!(
1745+ 500,
1746+ 0,
1747+ 0,
1748+ 0,
1749+ format!("cannot parse: {} {}", from.address, e)
1750+ ))
1751+ }
1752+ };
1753 self.mail_from = Some(mail_from.clone());
1754- if self.spf_verification {
1755+ if self.flags.spf {
1756 tracing::info!("Running SPF Validation");
1757- let ip_addr = self.opts.ip_addr.ok_or(smtp_response!(
1758- 500,
1759- 0,
1760- 0,
1761- 0,
1762- "Client has no IP Address"
1763- ))?;
1764- let helo_domain = self
1765- .hostname
1766+ let ip_addr = match self.client_ip {
1767+ Some(ip_addr) => ip_addr,
1768+ None => {
1769+ return Action::Send(smtp_response!(
1770+ 500,
1771+ 0,
1772+ 0,
1773+ 0,
1774+ "Client has no IP Address"
1775+ ))
1776+ }
1777+ };
1778+ let helo_domain = match &self.hostname {
1779+ Some(helo_domain) => helo_domain.to_string(),
1780+ None => {
1781+ return Action::Send(smtp_response!(
1782+ 500,
1783+ 0,
1784+ 0,
1785+ 0,
1786+ "hostname is not specified"
1787+ ))
1788+ }
1789+ };
1790+ let host_domain = self
1791+ .our_hostname
1792 .clone()
1793- .ok_or(smtp_response!(500, 0, 0, 0, "hostname is not specified"))?
1794- .to_string();
1795- let our_domain = self.opts.our_hostname.clone();
1796- let resolver = self.resolver.as_ref().expect("Resolver not configured");
1797- let resolver = resolver.lock().await;
1798- let pass = Validation(resolver)
1799- .verify_spf(ip_addr, &helo_domain, &our_domain, mail_from.as_str())
1800- .await;
1801- if !pass {
1802- return Err(smtp_response!(500, 0, 0, 0, "SPF Verification Failed"));
1803+ .expect("session hostname not specified");
1804+ let inner = self;
1805+ Action::SpfVerification {
1806+ ip_addr,
1807+ helo_domain: helo_domain.clone(),
1808+ host_domain,
1809+ mail_from: mail_from.clone(),
1810+ cb: Box::new(move |success| {
1811+ if success {
1812+ inner.spf_verified_host = Some(helo_domain.clone());
1813+ Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
1814+ } else {
1815+ Action::Send(smtp_response!(
1816+ 500,
1817+ 0,
1818+ 0,
1819+ 0,
1820+ "SPF Verification Failed"
1821+ ))
1822+ }
1823+ }),
1824 }
1825+ } else {
1826+ Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
1827 }
1828- Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
1829 }
1830- Request::Rcpt { to } => {
1831- self.check_initialized()?;
1832- let rcpt_to = EmailAddress::from_str(to.address.as_str()).map_err(|e| {
1833- smtp_response!(500, 0, 0, 0, format!("cannot parse: {} {}", to.address, e))
1834- })?;
1835+ Some(Request::Rcpt { to }) => {
1836+ if let Some(err) = self.check_initialized().err() {
1837+ return Action::Send(err);
1838+ }
1839+ let rcpt_to = match EmailAddress::from_str(to.address.as_str()) {
1840+ Ok(rcpt_to) => rcpt_to,
1841+ Err(e) => {
1842+ return Action::Send(smtp_response!(
1843+ 500,
1844+ 0,
1845+ 0,
1846+ 0,
1847+ format!("cannot parse: {} {}", to.address, e)
1848+ ))
1849+ }
1850+ };
1851 if let Some(ref mut rcpts) = self.rcpt_to {
1852 rcpts.push(rcpt_to.clone());
1853 } else {
1854 self.rcpt_to = Some(vec![rcpt_to.clone()]);
1855 }
1856- Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
1857+ Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
1858 }
1859- Request::Bdat {
1860+ Some(Request::Bdat {
1861 chunk_size: _,
1862 is_last: _,
1863- } => {
1864- self.check_initialized()?;
1865- tracing::info!("Initializing data transfer mode");
1866- self.data_transfer = Some(DataTransfer::Bdat);
1867- Ok(vec![smtp_response!(
1868- 354,
1869- 0,
1870- 0,
1871- 0,
1872- "Starting BDAT data transfer".to_string()
1873- )])
1874+ }) => {
1875+ if let Some(err) = self.check_initialized().err() {
1876+ return Action::Send(err);
1877+ }
1878+ let inner = self;
1879+ tracing::info!("Starting binary data transfer");
1880+ Action::BDat {
1881+ initial_response: smtp_response!(
1882+ 354,
1883+ 0,
1884+ 0,
1885+ 0,
1886+ "Starting BDAT data transfer".to_string()
1887+ ),
1888+ cb: Box::new(move |payload| {
1889+ let copied = payload.to_vec();
1890+ if let Err(response) = inner.check_body(&copied) {
1891+ return Action::Send(response);
1892+ };
1893+ let parser = MessageParser::new();
1894+ match parser.parse(&copied) {
1895+ Some(message) => {
1896+ inner.body = Some(message.into_owned());
1897+ Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
1898+ }
1899+ None => Action::Send(smtp_response!(
1900+ 500,
1901+ 0,
1902+ 0,
1903+ 0,
1904+ "Cannot parse message payload"
1905+ )),
1906+ }
1907+ }),
1908+ }
1909 }
1910 // After an AUTH command has been successfully completed, no more
1911 // AUTH commands may be issued in the same session. After a
1912 // successful AUTH command completes, a server MUST reject any
1913 // further AUTH commands with a 503 reply.
1914- Request::Auth {
1915+ Some(Request::Auth {
1916 mechanism,
1917 initial_response,
1918- } => {
1919- if let Some(auth_fn) = &self.opts.plain_auth {
1920+ }) => {
1921+ if let Some(err) = self.check_initialized().err() {
1922+ return Action::Send(err);
1923+ }
1924+ if self.flags.authentication {
1925 if *mechanism != smtp_proto::AUTH_PLAIN {
1926 // only plain auth is supported
1927- return Err(smtp_response!(504, 5, 5, 4, "Auth Not Supported"));
1928+ return Action::Send(smtp_response!(504, 5, 5, 4, "Auth Not Supported"));
1929+ }
1930+ let auth_data = match AuthData::try_from(initial_response.as_str()) {
1931+ Ok(auth_data) => auth_data,
1932+ Err(e) => return Action::Send(e.into()),
1933+ };
1934+ // TODO: Let the auth callback return this instead
1935+ let authcid = auth_data.authcid().clone();
1936+ let inner = self;
1937+ Action::PlainAuth {
1938+ authcid: auth_data.authcid(),
1939+ authzid: auth_data.authzid(),
1940+ password: auth_data.passwd(),
1941+ cb: Box::new(move |result| match result {
1942+ Ok(_) => {
1943+ tracing::info!("Successfully authenticated");
1944+ inner.authenticated_id = Some(authcid);
1945+ Action::Send(smtp_response!(235, 2, 7, 0, "OK"))
1946+ }
1947+ Err(e) => Action::Send(e.into()),
1948+ }),
1949 }
1950- let auth_data =
1951- AuthData::try_from(initial_response.as_str()).map_err(|e| e.into())?;
1952-
1953- auth_fn
1954- .authenticate(
1955- &auth_data.authcid(),
1956- &auth_data.authzid(),
1957- &auth_data.passwd(),
1958- )
1959- .await
1960- .map_err(|e| e.into())?;
1961-
1962- tracing::info!("Successfully authenticated");
1963-
1964- self.auth_initialized = true;
1965-
1966- Ok(vec![smtp_response!(235, 2, 7, 0, "OK")])
1967 } else {
1968- Err(smtp_response!(504, 5, 5, 4, "Auth Not Supported"))
1969+ Action::Send(smtp_response!(504, 5, 5, 4, "Auth Not Supported"))
1970 }
1971 }
1972- Request::Noop { value: _ } => {
1973- self.check_initialized()?;
1974- Ok(vec![smtp_response!(250, 0, 0, 0, "OK".to_string())])
1975+ Some(Request::Noop { value: _ }) => {
1976+ if let Some(err) = self.check_initialized().err() {
1977+ return Action::Send(err);
1978+ }
1979+ Action::Send(smtp_response!(250, 0, 0, 0, "OK".to_string()))
1980 }
1981- Request::Vrfy { value } => {
1982- if let Some(verifier) = &self.opts.verification {
1983- let address = EmailAddress::from_str(value.as_str()).map_err(|e| {
1984- smtp_response!(500, 0, 0, 0, format!("cannot parse: {} {}", value, e))
1985- })?;
1986- match verifier.verify(&address).await {
1987- Ok(_) => Ok(vec![smtp_response!(250, 0, 0, 0, "OK".to_string())]),
1988- Err(e) => Err(smtp_response!(500, 0, 0, 0, e.to_string())),
1989+ Some(Request::Vrfy { value }) => {
1990+ if let Some(err) = self.check_initialized().err() {
1991+ return Action::Send(err);
1992+ }
1993+ if self.flags.vrfy {
1994+ let address = match EmailAddress::from_str(value) {
1995+ Ok(addr) => addr,
1996+ Err(e) => {
1997+ return Action::Send(smtp_response!(
1998+ 500,
1999+ 0,
2000+ 0,
2001+ 0,
2002+ format!("cannot parse: {} {}", value, e)
2003+ ))
2004+ }
2005+ };
2006+ Action::Verify {
2007+ address,
2008+ cb: Box::new(move |result| match result {
2009+ Ok(_) => Action::Send(smtp_response!(200, 0, 0, 0, "OK")),
2010+ Err(e) => Action::Send(e.into()),
2011+ }),
2012 }
2013 } else {
2014- Err(smtp_response!(500, 0, 0, 0, "No such address"))
2015+ Action::Send(smtp_response!(500, 0, 0, 0, "VRFY Unavailable"))
2016 }
2017 }
2018- Request::Expn { value } => {
2019- if let Some(expn) = &self.opts.list_expansion {
2020- match expn.expand(value).await {
2021- Ok(addresses) => {
2022- let mut result = vec![smtp_response!(250, 0, 0, 0, "OK")];
2023- result.extend(
2024- addresses
2025- .iter()
2026- .map(|addr| smtp_response!(250, 0, 0, 0, addr.to_string())),
2027- );
2028- Ok(result)
2029- }
2030- Err(e) => Err(smtp_response!(500, 0, 0, 0, e.to_string())),
2031+ Some(Request::Expn { value }) => {
2032+ if let Some(err) = self.check_initialized().err() {
2033+ return Action::Send(err);
2034+ }
2035+ if self.flags.expn && self.authenticated_id.is_some() {
2036+ Action::Expand {
2037+ address: value.clone(),
2038+ cb: Box::new(move |result| match result {
2039+ Ok(addresses) => {
2040+ let mut responses = vec![smtp_response!(250, 0, 0, 0, "OK")];
2041+ responses.extend(
2042+ addresses
2043+ .iter()
2044+ .map(|addr| smtp_response!(250, 0, 0, 0, addr.to_string())),
2045+ );
2046+ Action::SendMany(responses)
2047+ }
2048+ Err(e) => Action::Send(e.into()),
2049+ }),
2050 }
2051 } else {
2052- Err(smtp_response!(500, 0, 0, 0, "Server does not support EXPN"))
2053+ Action::Send(smtp_response!(500, 0, 0, 0, "EXPN Unavailable"))
2054 }
2055 }
2056- Request::Help { value } => {
2057- self.check_initialized()?;
2058+ Some(Request::Help { value }) => {
2059+ if let Some(err) = self.check_initialized().err() {
2060+ return Action::Send(err);
2061+ }
2062 if value.is_empty() {
2063- Ok(vec![smtp_response!(
2064- 250,
2065- 0,
2066- 0,
2067- 0,
2068- self.opts.help_banner.to_string()
2069- )])
2070+ Action::Send(smtp_response!(250, 0, 0, 0, self.help_banner))
2071 } else {
2072- Err(smtp_response!(
2073+ Action::Send(smtp_response!(
2074 500,
2075 0,
2076 0,
2077 @@ -579,18 +779,22 @@ impl Session {
2078 ))
2079 }
2080 }
2081- Request::Etrn { name: _ } => Err(smtp_response!(500, 0, 0, 0, "ETRN is not supported")),
2082- Request::Atrn { domains: _ } => {
2083- Err(smtp_response!(500, 0, 0, 0, "ATRN is not supported"))
2084+ Some(Request::Etrn { name: _ }) => {
2085+ Action::Send(smtp_response!(500, 0, 0, 0, "ETRN is not supported"))
2086 }
2087- Request::Burl { uri: _, is_last: _ } => {
2088- Err(smtp_response!(500, 0, 0, 0, "BURL is not supported"))
2089+ Some(Request::Atrn { domains: _ }) => {
2090+ Action::Send(smtp_response!(500, 0, 0, 0, "ATRN is not supported"))
2091 }
2092- Request::StartTls => {
2093- if self.opts.starttls_enabled.is_some_and(|enabled| enabled) {
2094- Ok(vec![smtp_response!(220, 0, 0, 0, "Go ahead")])
2095+ Some(Request::Burl { uri: _, is_last: _ }) => {
2096+ Action::Send(smtp_response!(500, 0, 0, 0, "BURL is not supported"))
2097+ }
2098+ Some(Request::StartTls) => {
2099+ if self.flags.starttls && !self.tls_active {
2100+ Action::StartTls(smtp_response!(220, 0, 0, 0, "Go ahead"))
2101+ } else if self.flags.starttls && self.tls_active {
2102+ Action::Send(tls_already_active())
2103 } else {
2104- Err(smtp_response!(
2105+ Action::Send(smtp_response!(
2106 500,
2107 0,
2108 0,
2109 @@ -599,428 +803,395 @@ impl Session {
2110 ))
2111 }
2112 }
2113- Request::Data => {
2114- self.check_initialized()?;
2115- tracing::info!("Initializing data transfer mode");
2116- self.data_transfer = Some(DataTransfer::Data);
2117- Ok(vec![smtp_response!(
2118- 354,
2119- 0,
2120- 0,
2121- 0,
2122- "Reading data input, end the message with <CRLF>.<CRLF>".to_string()
2123- )])
2124+ Some(Request::Data) => {
2125+ if let Some(err) = self.check_initialized().err() {
2126+ return Action::Send(err);
2127+ }
2128+ tracing::info!("Starting data transfer");
2129+ let inner = self;
2130+ Action::Data {
2131+ initial_response: smtp_response!(
2132+ 354,
2133+ 0,
2134+ 0,
2135+ 0,
2136+ "Reading data input, end the message with <CRLF>.<CRLF>".to_string()
2137+ ),
2138+ cb: Box::new(move |payload| {
2139+ let copied = payload.to_vec();
2140+ if let Err(response) = inner.check_body(&copied) {
2141+ return Action::Send(response);
2142+ };
2143+ let parser = MessageParser::new();
2144+ match parser.parse(&copied) {
2145+ Some(message) => {
2146+ inner.body = Some(message.into_owned());
2147+ Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
2148+ }
2149+ None => Action::Send(smtp_response!(
2150+ 500,
2151+ 0,
2152+ 0,
2153+ 0,
2154+ "Cannot parse message payload"
2155+ )),
2156+ }
2157+ }),
2158+ }
2159 }
2160- Request::Rset => {
2161- self.check_initialized()?;
2162+ Some(Request::Rset) => {
2163+ if let Some(err) = self.check_initialized().err() {
2164+ return Action::Send(err);
2165+ }
2166 self.reset();
2167- Ok(vec![smtp_response!(200, 0, 0, 0, "".to_string())])
2168+ Action::Send(smtp_response!(200, 0, 0, 0, "".to_string()))
2169 }
2170- Request::Quit => Ok(vec![smtp_response!(221, 0, 0, 0, "Ciao!".to_string())]),
2171+ Some(Request::Quit) => Action::Quit(smtp_response!(221, 0, 0, 0, "Ciao!".to_string())),
2172 }
2173 }
2174 }
2175
2176 #[cfg(test)]
2177 mod test {
2178- use futures::stream::{self, StreamExt};
2179- use smtp_proto::{MailFrom, RcptTo};
2180- use tokio::sync::Mutex;
2181+
2182+ use base64::engine::general_purpose::STANDARD;
2183+ use base64::{prelude::*, DecodeError};
2184+ use smtp_proto::MailFrom;
2185
2186 use super::*;
2187
2188 const EXAMPLE_HOSTNAME: &str = "example.org";
2189
2190- struct TestCase {
2191- pub request: Request<String>,
2192- pub payload: Option<Bytes>,
2193- pub expected: Result,
2194- }
2195-
2196- /// process all commands returning their response
2197- async fn process_all(session: &Mutex<Session>, commands: &[TestCase]) {
2198- let stream = stream::iter(commands);
2199- stream.enumerate().for_each(|(i, command)| {
2200- async move {
2201- let mut session = session.lock().await;
2202- println!("Running command {}/{}", i, commands.len());
2203- let response = if let Some(payload) = &command.payload {
2204- session.handle_data(payload).await
2205- } else {
2206- session.process(&command.request).await
2207- };
2208- println!("Response: {:?}", response);
2209- match response {
2210- Ok(actual_response) => {
2211- match &command.expected {
2212- Ok(expected_response) => {
2213- if !actual_response.eq(expected_response) {
2214- panic!(
2215- "Unexpected response:\n\nActual: {:?}\nExpected: {:?}\n",
2216- actual_response, expected_response
2217- );
2218- }
2219- }
2220- Err(expected_err) => {
2221- panic!(
2222- "Expected an error but got valid response:\n\nResponse: {:?}\nExpected Error: {:?}",
2223- actual_response, expected_err
2224- );
2225- },
2226- }
2227- }
2228- Err(actual_err) => {
2229- match &command.expected {
2230- Ok(response) => {
2231- panic!(
2232- "Expected a valid response but got error:\nExpected: {:?}\nError: {:?}",
2233- response, actual_err,
2234- );
2235- },
2236- Err(expected_err) => {
2237- if !actual_err.eq(expected_err) {
2238- panic!("Expected error does not match:\n\nActual: {:?}\n Expected: {:?}", actual_err, expected_err);
2239- }
2240- },
2241+ fn equal(actual: &Action<'_>, expected: &Action<'_>) -> bool {
2242+ let is_equal = match actual {
2243+ Action::Send(response) => {
2244+ matches!(expected, Action::Send(other) if response.eq(other))
2245+ }
2246+ Action::SendMany(actual) => match expected {
2247+ Action::SendMany(expected) => actual.iter().enumerate().all(|(i, resp)| {
2248+ if let Some(expected_resp) = expected.get(i) {
2249+ resp.eq(expected_resp)
2250+ } else {
2251+ false
2252 }
2253- }
2254- };
2255+ }),
2256+ _ => false,
2257+ },
2258+ Action::BDat {
2259+ initial_response,
2260+ cb,
2261+ } => todo!(),
2262+ Action::Data {
2263+ initial_response,
2264+ cb,
2265+ } => todo!(),
2266+ Action::SpfVerification {
2267+ ip_addr,
2268+ helo_domain,
2269+ host_domain,
2270+ mail_from,
2271+ cb,
2272+ } => todo!(),
2273+ Action::PlainAuth {
2274+ authcid,
2275+ authzid,
2276+ password,
2277+ cb,
2278+ } => todo!(),
2279+ Action::Verify { address, cb } => todo!(),
2280+ Action::Expand { address, cb } => todo!(),
2281+ Action::StartTls(response) => todo!(),
2282+ Action::Quit(response) => {
2283+ matches!(expected, Action::Quit(other) if response.eq(other))
2284 }
2285- }).await;
2286+ };
2287+
2288+ if !is_equal {
2289+ println!("Responses Differ:");
2290+ println!("Expected:");
2291+ println!("{}", expected);
2292+ println!("Actual:");
2293+ println!("{}", actual);
2294+ return false;
2295+ };
2296+
2297+ true
2298 }
2299
2300- #[tokio::test]
2301- async fn test_hello_quit() {
2302- let requests = &[
2303- TestCase {
2304- request: Request::Helo {
2305- host: EXAMPLE_HOSTNAME.to_string(),
2306- },
2307- payload: None,
2308- expected: Ok(vec![smtp_response!(
2309- 250,
2310- 0,
2311- 0,
2312- 0,
2313- String::from("Hello example.org")
2314- )]),
2315- },
2316- TestCase {
2317- request: Request::Quit {},
2318- payload: None,
2319- expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]),
2320- },
2321- ];
2322- let session = Mutex::new(Session::default());
2323- process_all(&session, requests).await;
2324- let session = session.lock().await;
2325- // session should contain both requests
2326+ #[test]
2327+ fn session_greeting() {
2328+ let mut session = Session::default();
2329+ assert!(matches!(session.next(None), Action::Send(_)))
2330+ }
2331+
2332+ #[test]
2333+ fn session_hello_quit() {
2334+ let mut session = Session::default();
2335+ assert!(equal(
2336+ &session.next(Some(&Request::Helo {
2337+ host: EXAMPLE_HOSTNAME.to_string(),
2338+ })),
2339+ &Action::Send(smtp_response!(
2340+ 250,
2341+ 0,
2342+ 0,
2343+ 0,
2344+ String::from("Hello example.org")
2345+ )),
2346+ ));
2347+ assert!(equal(
2348+ &session.next(Some(&Request::Quit {})),
2349+ &Action::Quit(smtp_response!(221, 0, 0, 0, String::from("Ciao!"))),
2350+ ));
2351+
2352 assert!(session
2353 .hostname
2354 .as_ref()
2355 .is_some_and(|hostname| hostname.to_string() == EXAMPLE_HOSTNAME));
2356 }
2357
2358- #[tokio::test]
2359- async fn test_command_with_no_hello() {
2360- let requests = &[TestCase {
2361- request: Request::Mail {
2362+ #[test]
2363+ fn session_command_with_no_helo() {
2364+ let mut session = Session::default();
2365+ assert!(equal(
2366+ &session.next(Some(&Request::Mail {
2367 from: MailFrom {
2368 address: String::from("fuu@example.org"),
2369 ..Default::default()
2370- },
2371- },
2372- payload: None,
2373- expected: Err(smtp_response!(
2374+ }
2375+ })),
2376+ &Action::Send(smtp_response!(
2377 500,
2378 5,
2379 5,
2380 1,
2381 String::from("It's polite to say EHLO first")
2382- )),
2383- }];
2384- let session = Mutex::new(
2385- Session::default()
2386- .with_options(SessionOptions::default().our_hostname(EXAMPLE_HOSTNAME)),
2387- );
2388- process_all(&session, requests).await;
2389+ ))
2390+ ))
2391 }
2392
2393- #[tokio::test]
2394- async fn test_expand() {
2395- let requests = &[
2396- TestCase {
2397- request: Request::Helo {
2398- host: EXAMPLE_HOSTNAME.to_string(),
2399- },
2400- payload: None,
2401- expected: Ok(vec![smtp_response!(
2402- 250,
2403- 0,
2404- 0,
2405- 0,
2406- String::from("Hello example.org")
2407- )]),
2408- },
2409- TestCase {
2410- request: Request::Expn {
2411- value: "mailing-list".to_string(),
2412- },
2413- payload: None,
2414- expected: Ok(vec![
2415- smtp_response!(250, 0, 0, 0, "OK"),
2416- smtp_response!(250, 0, 0, 0, "Fuu <fuu@bar.com>"),
2417- smtp_response!(250, 0, 0, 0, "Baz <baz@qux.com>"),
2418- ]),
2419- },
2420- TestCase {
2421- request: Request::Quit {},
2422- payload: None,
2423- expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]),
2424- },
2425- ];
2426- let session = Mutex::new(Session::default().with_options(
2427- SessionOptions::default().list_expansion(crate::expand::ExpansionFunc(|name: &str| {
2428- let name = name.to_string();
2429- async move {
2430- assert!(name == "mailing-list");
2431- Ok(vec![
2432- EmailAddress::new_unchecked("Fuu <fuu@bar.com>"),
2433- EmailAddress::new_unchecked("Baz <baz@qux.com>"),
2434- ])
2435- }
2436+ #[test]
2437+ fn session_authenticate() {
2438+ let session = &mut Session::default().authentication(true);
2439+ assert!(equal(
2440+ &session.next(Some(&Request::Helo {
2441+ host: EXAMPLE_HOSTNAME.to_string(),
2442 })),
2443+ &Action::Send(smtp_response!(
2444+ 250,
2445+ 0,
2446+ 0,
2447+ 0,
2448+ String::from("Hello example.org")
2449+ )),
2450 ));
2451- process_all(&session, requests).await;
2452- // session should contain both requests
2453- let session = session.lock().await;
2454+
2455+ {
2456+ let auth = session.next(Some(&Request::Auth {
2457+ mechanism: smtp_proto::AUTH_PLAIN,
2458+ initial_response: STANDARD.encode(b"\0hello\0world"),
2459+ }));
2460+ match auth {
2461+ Action::PlainAuth {
2462+ authcid,
2463+ authzid,
2464+ password,
2465+ cb,
2466+ } => {
2467+ assert!(authcid == "hello");
2468+ assert!(authzid == "hello");
2469+ assert!(password == "world");
2470+ assert!(equal(
2471+ &cb(Ok(())),
2472+ &Action::Send(smtp_response!(235, 2, 7, 0, "OK"))
2473+ ));
2474+ }
2475+ _ => panic!("Unexpected response"),
2476+ };
2477+ };
2478+
2479 assert!(session
2480- .hostname
2481+ .authenticated_id
2482 .as_ref()
2483- .is_some_and(|hostname| hostname.to_string() == EXAMPLE_HOSTNAME));
2484+ .is_some_and(|id| id == "hello"));
2485 }
2486
2487- #[tokio::test]
2488- async fn test_verify() {
2489- let requests = &[
2490- TestCase {
2491- request: Request::Helo {
2492- host: EXAMPLE_HOSTNAME.to_string(),
2493- },
2494- payload: None,
2495- expected: Ok(vec![smtp_response!(
2496- 250,
2497- 0,
2498- 0,
2499- 0,
2500- String::from("Hello example.org")
2501- )]),
2502- },
2503- TestCase {
2504- request: Request::Vrfy {
2505- value: "Fuu <bar@baz.com>".to_string(),
2506- },
2507- payload: None,
2508- expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
2509- },
2510- TestCase {
2511- request: Request::Quit {},
2512- payload: None,
2513- expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]),
2514- },
2515- ];
2516- let session = Mutex::new(Session::default().with_options(
2517- SessionOptions::default().verification(crate::verify::VerifyFunc(
2518- |addr: &EmailAddress| {
2519- let addr = addr.clone();
2520- async move {
2521- assert!(addr.email() == "bar@baz.com");
2522- Ok(())
2523- }
2524- },
2525- )),
2526- ));
2527- process_all(&session, requests).await;
2528- // session should contain both requests
2529- let session = session.lock().await;
2530- assert!(session
2531- .hostname
2532- .as_ref()
2533- .is_some_and(|hostname| hostname.to_string() == EXAMPLE_HOSTNAME));
2534+ #[test]
2535+ fn session_expand() {
2536+ let session = &mut Session::default().authentication(true).expn_enabled(true);
2537+ session.initialized = Some(Mode::Extended);
2538+ session.authenticated_id = Some("hello".to_string());
2539+ match session.next(Some(&Request::Expn {
2540+ value: String::from("group@baz.com"),
2541+ })) {
2542+ Action::Expand { address: _, cb } => {
2543+ assert!(equal(
2544+ &cb(Ok(vec![
2545+ EmailAddress::new_unchecked("fuu@bar.com"),
2546+ EmailAddress::new_unchecked("baz@qux.com")
2547+ ])),
2548+ &Action::SendMany(vec![
2549+ smtp_response!(250, 0, 0, 0, "OK"),
2550+ smtp_response!(250, 0, 0, 0, "fuu@bar.com"),
2551+ smtp_response!(250, 0, 0, 0, "baz@qux.com")
2552+ ])
2553+ ));
2554+ }
2555+ _ => panic!("Unexpected response"),
2556+ };
2557 }
2558
2559- #[tokio::test]
2560- async fn test_non_ascii_characters() {
2561- let mut expected_ehlo_response = EhloResponse::new(String::from("Hello example.org"));
2562- expected_ehlo_response.capabilities = DEFAULT_CAPABILITIES;
2563- expected_ehlo_response.size = DEFAULT_MAXIMUM_MESSAGE_SIZE as usize;
2564- let requests = &[
2565- TestCase {
2566- request: Request::Helo {
2567- host: EXAMPLE_HOSTNAME.to_string(),
2568- },
2569- payload: None,
2570- expected: Ok(vec![smtp_response!(
2571- 250,
2572- 0,
2573- 0,
2574- 0,
2575- String::from("Hello example.org")
2576- )]),
2577- },
2578- TestCase {
2579- request: Request::Data {},
2580- payload: None,
2581- expected: Ok(vec![smtp_response!(
2582- 354,
2583- 0,
2584- 0,
2585- 0,
2586- "Reading data input, end the message with <CRLF>.<CRLF>"
2587- )]),
2588- },
2589- TestCase {
2590- request: Request::Data {},
2591- payload: Some(Bytes::from_static(
2592- r#"Subject: Hello World
2593+ #[test]
2594+ fn session_verify() {
2595+ let session = &mut Session::default().authentication(true).vrfy_enabled(true);
2596+ session.initialized = Some(Mode::Extended);
2597+ session.authenticated_id = Some("hello".to_string());
2598+ match session.next(Some(&Request::Vrfy {
2599+ value: String::from("qux@baz.com"),
2600+ })) {
2601+ Action::Verify { address, cb } => {
2602+ assert!(address.to_string() == "qux@baz.com");
2603+ assert!(equal(
2604+ &cb(Ok(())),
2605+ &Action::Send(smtp_response!(200, 0, 0, 0, "OK"))
2606+ ));
2607+ }
2608+ _ => panic!("Unexpected response"),
2609+ };
2610+ }
2611+
2612+ #[test]
2613+ fn session_non_ascii_characters_legacy_smtp() {
2614+ let session = &mut Session::default();
2615+ // non-extended sessions cannot accept non-ascii characters
2616+ session.initialized = Some(Mode::Legacy);
2617+ session.mail_from = Some(EmailAddress::new_unchecked("fuu@bar.com"));
2618+ match session.next(Some(&Request::Data {})) {
2619+ Action::Data {
2620+ initial_response,
2621+ cb,
2622+ } => {
2623+ assert!(equal(
2624+ &Action::Send(initial_response),
2625+ &Action::Send(smtp_response!(
2626+ 354,
2627+ 0,
2628+ 0,
2629+ 0,
2630+ "Reading data input, end the message with <CRLF>.<CRLF>"
2631+ ))
2632+ ));
2633+ let action = cb(Bytes::from_static(
2634+ r#"
2635+ Subject: Hello World
2636 😍😍😍
2637 "#
2638 .as_bytes(),
2639- )),
2640- expected: Err(smtp_response!(
2641- 500,
2642- 0,
2643- 0,
2644- 0,
2645- "Non ascii characters found in message body"
2646- )),
2647- },
2648- // upgrade the connection to extended mode
2649- TestCase {
2650- request: Request::Ehlo {
2651- host: EXAMPLE_HOSTNAME.to_string(),
2652- },
2653- payload: None,
2654- expected: Ok(vec![Response::Ehlo(expected_ehlo_response)]),
2655- },
2656- TestCase {
2657- request: Request::Data {},
2658- payload: None,
2659- expected: Ok(vec![smtp_response!(
2660- 354,
2661- 0,
2662- 0,
2663- 0,
2664- "Reading data input, end the message with <CRLF>.<CRLF>"
2665- )]),
2666- },
2667- TestCase {
2668- request: Request::Data {},
2669- payload: Some(Bytes::from_static(
2670- r#"Subject: Hello World
2671+ ));
2672+ assert!(equal(
2673+ &action,
2674+ &Action::Send(smtp_response!(
2675+ 500,
2676+ 0,
2677+ 0,
2678+ 0,
2679+ "Non ASCII characters found in message body"
2680+ ))
2681+ ))
2682+ }
2683+ _ => panic!("Unexpected response"),
2684+ };
2685+ }
2686+
2687+ #[test]
2688+ fn session_non_ascii_characters_extended_smtp() {
2689+ let session = &mut Session::default();
2690+ // non-extended sessions cannot accept non-ascii characters
2691+ session.initialized = Some(Mode::Extended);
2692+ session.mail_from = Some(EmailAddress::new_unchecked("fuu@bar.com"));
2693+ match session.next(Some(&Request::Data {})) {
2694+ Action::Data {
2695+ initial_response,
2696+ cb,
2697+ } => {
2698+ assert!(equal(
2699+ &Action::Send(initial_response),
2700+ &Action::Send(smtp_response!(
2701+ 354,
2702+ 0,
2703+ 0,
2704+ 0,
2705+ "Reading data input, end the message with <CRLF>.<CRLF>"
2706+ ))
2707+ ));
2708+ let action = cb(Bytes::from_static(
2709+ r#"
2710+ Subject: Hello World
2711 😍😍😍
2712 "#
2713 .as_bytes(),
2714- )),
2715- expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
2716- },
2717- ];
2718- let session = Mutex::new(
2719- Session::default().with_options(
2720- SessionOptions::default()
2721- .our_hostname(EXAMPLE_HOSTNAME)
2722- .capabilities(DEFAULT_CAPABILITIES),
2723- ),
2724- );
2725- process_all(&session, requests).await;
2726+ ));
2727+ assert!(equal(
2728+ &action,
2729+ &Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
2730+ ))
2731+ }
2732+ _ => panic!("Unexpected response"),
2733+ };
2734 }
2735
2736- #[tokio::test]
2737- async fn test_email_with_body() {
2738- let requests = &[
2739- TestCase {
2740- request: Request::Helo {
2741- host: EXAMPLE_HOSTNAME.to_string(),
2742- },
2743- payload: None,
2744- expected: Ok(vec![smtp_response!(250, 0, 0, 0, "Hello example.org")]),
2745- },
2746- TestCase {
2747- request: Request::Mail {
2748- from: MailFrom {
2749- address: String::from("fuu@example.org"),
2750- ..Default::default()
2751- },
2752- },
2753- payload: None,
2754- expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
2755- },
2756- TestCase {
2757- request: Request::Rcpt {
2758- to: RcptTo {
2759- address: String::from("bar@example.org"),
2760- ..Default::default()
2761- },
2762- },
2763- payload: None,
2764- expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
2765- },
2766- // initiate data transfer
2767- TestCase {
2768- request: Request::Data {},
2769- payload: None,
2770- expected: Ok(vec![smtp_response!(
2771- 354,
2772- 0,
2773- 0,
2774- 0,
2775- "Reading data input, end the message with <CRLF>.<CRLF>"
2776- )]),
2777- },
2778- // send the actual payload
2779- TestCase {
2780- request: Request::Data {},
2781- payload: Some(Bytes::from_static(
2782- br#"Subject: Hello World
2783+ #[test]
2784+ fn session_message_body_ok() {
2785+ let session = &mut Session::default();
2786+ // non-extended sessions cannot accept non-ascii characters
2787+ session.initialized = Some(Mode::Extended);
2788+ session.mail_from = Some(EmailAddress::new_unchecked("fuu@bar.com"));
2789+ {
2790+ match session.next(Some(&Request::Data {})) {
2791+ Action::Data {
2792+ initial_response,
2793+ cb,
2794+ } => {
2795+ assert!(equal(
2796+ &Action::Send(initial_response),
2797+ &Action::Send(smtp_response!(
2798+ 354,
2799+ 0,
2800+ 0,
2801+ 0,
2802+ "Reading data input, end the message with <CRLF>.<CRLF>"
2803+ ))
2804+ ));
2805+ let action = cb(Bytes::from_static(
2806+ r#"To: <baz@qux.com>
2807+ Subject: Hello World
2808
2809 This is an e-mail from a test case!
2810
2811 Note that it doesn't end with a "." since that parsing happens as part of the
2812- transport rather than the session.
2813- "#,
2814- )),
2815- expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
2816- },
2817- ];
2818- let session = Mutex::new(
2819- Session::default()
2820- .with_options(SessionOptions::default().our_hostname(EXAMPLE_HOSTNAME)),
2821- );
2822- process_all(&session, requests).await;
2823- let session = session.lock().await;
2824- assert!(session
2825- .mail_from
2826- .as_ref()
2827- .is_some_and(|mail_from| mail_from.email() == "fuu@example.org"));
2828- assert!(session.rcpt_to.as_ref().is_some_and(|rcpts| rcpts
2829- .first()
2830- .is_some_and(|rcpt_to| rcpt_to.email() == "bar@example.org")));
2831- assert!(session.body.as_ref().is_some_and(|body| {
2832- body.subject()
2833- .is_some_and(|subject| subject == "Hello World")
2834- }));
2835- }
2836+ transport rather than the session. 🩷
2837+ "#
2838+ .as_bytes(),
2839+ ));
2840+ assert!(equal(
2841+ &action,
2842+ &Action::Send(smtp_response!(250, 0, 0, 0, "OK"))
2843+ ))
2844+ }
2845+ _ => panic!("Unexpected response"),
2846+ };
2847+ };
2848
2849- #[tokio::test]
2850- pub async fn test_domain_parsing() {
2851- let mut session = Session::default();
2852- for host in ["127.0.0.1", "[127.0.0.1]", "example.org", "IPv6: ::1"] {
2853- session
2854- .process(&Request::Ehlo {
2855- host: host.to_string(),
2856- })
2857- .await
2858- .unwrap();
2859- }
2860+ let message_body = session.body.clone().unwrap();
2861+
2862+ assert!(message_body
2863+ .to()
2864+ .is_some_and(|to| to.first().is_some_and(|to| to
2865+ .address
2866+ .as_ref()
2867+ .is_some_and(|addr| { addr == "baz@qux.com" }))));
2868+ assert!(message_body
2869+ .subject()
2870+ .is_some_and(|subject| subject == "Hello World"));
2871 }
2872 }
2873 diff --git a/maitred/src/transport.rs b/maitred/src/transport.rs
2874index 3a7650b..9a62879 100644
2875--- a/maitred/src/transport.rs
2876+++ b/maitred/src/transport.rs
2877 @@ -3,11 +3,13 @@ use std::{fmt::Display, io::Write};
2878 use bytes::{Bytes, BytesMut};
2879 use smtp_proto::request::receiver::{BdatReceiver, DataReceiver, RequestReceiver};
2880 use smtp_proto::Error as SmtpError;
2881- pub use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
2882+ use smtp_proto::Request;
2883 use tokio_util::codec::{Decoder, Encoder};
2884
2885+ use crate::session::Response;
2886+
2887 #[derive(Debug, thiserror::Error)]
2888- pub(crate) enum TransportError {
2889+ pub enum TransportError {
2890 /// Returned when a client attempts to send multiple commands sequentially
2891 /// to the server without waiting for a response but piplining isn't
2892 /// enabled.
2893 @@ -21,53 +23,6 @@ pub(crate) enum TransportError {
2894 Io(#[from] std::io::Error),
2895 }
2896
2897- #[derive(Debug, Clone)]
2898- pub enum Response<T>
2899- where
2900- T: Display,
2901- {
2902- General(SmtpResponse<T>),
2903- Ehlo(EhloResponse<T>),
2904- }
2905-
2906- impl Response<String> {
2907- pub fn is_fatal(&self) -> bool {
2908- match self {
2909- Response::General(resp) => resp.code >= 500,
2910- Response::Ehlo(_) => false,
2911- }
2912- }
2913- }
2914-
2915- impl<T> PartialEq for Response<T>
2916- where
2917- T: Display,
2918- {
2919- fn eq(&self, other: &Self) -> bool {
2920- match self {
2921- Response::General(req) => match other {
2922- Response::General(other) => req.to_string() == other.to_string(),
2923- Response::Ehlo(_) => false,
2924- },
2925- Response::Ehlo(req) => match other {
2926- Response::General(_) => false,
2927- Response::Ehlo(other) => {
2928- // FIXME
2929- req.capabilities == other.capabilities
2930- && req.hostname.to_string() == other.hostname.to_string()
2931- && req.deliver_by == other.deliver_by
2932- && req.size == other.size
2933- && req.auth_mechanisms == other.auth_mechanisms
2934- && req.future_release_datetime.eq(&req.future_release_datetime)
2935- && req.future_release_interval.eq(&req.future_release_interval)
2936- }
2937- },
2938- }
2939- }
2940- }
2941-
2942- impl<T> Eq for Response<T> where T: Display {}
2943-
2944 struct Wrapper<'a>(&'a mut BytesMut);
2945
2946 impl Write for Wrapper<'_> {
2947 @@ -88,7 +43,7 @@ pub(crate) enum Receiver {
2948
2949 /// Command from the client with an optional attached payload.
2950 #[derive(Debug)]
2951- pub(crate) enum Command {
2952+ pub enum Command {
2953 Requests(Vec<Request<String>>),
2954 Payload(Bytes),
2955 }
2956 @@ -105,7 +60,7 @@ impl Display for Command {
2957 /// Line oriented transport
2958 /// TODO: BINARYMIME
2959 #[derive(Default)]
2960- pub(crate) struct Transport {
2961+ pub struct Transport {
2962 receiver: Option<Box<Receiver>>,
2963 buf: Vec<u8>,
2964 pipelining: bool,
2965 @@ -133,6 +88,7 @@ impl Encoder<Response<String>> for Transport {
2966 type Error = TransportError;
2967
2968 fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> Result<(), Self::Error> {
2969+ tracing::debug!("Writing response: {:?}", item);
2970 match item {
2971 Response::General(item) => {
2972 item.write(Wrapper(dst))?;
2973 diff --git a/maitred/src/verify.rs b/maitred/src/verify.rs
2974index cd93573..6497908 100644
2975--- a/maitred/src/verify.rs
2976+++ b/maitred/src/verify.rs
2977 @@ -2,6 +2,10 @@ use std::future::Future;
2978
2979 use async_trait::async_trait;
2980 use email_address::EmailAddress;
2981+ use smtp_proto::Response as SmtpResponse;
2982+
2983+ use crate::session::Response;
2984+ use crate::smtp_response;
2985
2986 /// An error encountered while verifying an e-mail address
2987 #[derive(Debug, thiserror::Error)]
2988 @@ -21,6 +25,20 @@ pub enum VerifyError {
2989 },
2990 }
2991
2992+ #[allow(clippy::from_over_into)]
2993+ impl Into<Response<String>> for VerifyError {
2994+ fn into(self) -> Response<String> {
2995+ match self {
2996+ VerifyError::Server(_) => smtp_response!(500, 0, 0, 0, self.to_string()),
2997+ VerifyError::NotFound(_) => smtp_response!(404, 0, 0, 0, self.to_string()),
2998+ VerifyError::Ambiguous {
2999+ email: _,
3000+ alternatives: _,
3001+ } => smtp_response!(500, 0, 0, 0, self.to_string()),
3002+ }
3003+ }
3004+ }
3005+
3006 /// Verify that the given e-mail address exists on the server. Servers may
3007 /// choose to implement nothing or not use this option at all if desired.
3008 #[async_trait]
3009 diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs
3010index 293d5b1..b015d2c 100644
3011--- a/maitred/src/worker.rs
3012+++ b/maitred/src/worker.rs
3013 @@ -8,8 +8,9 @@ use tokio::sync::{mpsc::Receiver, Mutex};
3014 use crate::delivery::Delivery;
3015 use crate::milter::Milter;
3016 use crate::rewrite::Rewrite;
3017+ use crate::server::ServerError;
3018+ use crate::session::Envelope;
3019 use crate::validation::Validation;
3020- use crate::{Envelope, Error};
3021
3022 const HEADER_DKIM_RESULT: &str = "Maitred-Dkim-Result";
3023
3024 @@ -19,7 +20,6 @@ const HEADER_DKIM_RESULT: &str = "Maitred-Dkim-Result";
3025 /// Sequentially applying milters in the order they were configured
3026 /// Running DKIM verification
3027 /// ARC Verficiation
3028- /// SPF Verification
3029 pub(crate) struct Worker {
3030 pub milter: Option<Arc<dyn Milter>>,
3031 pub delivery: Option<Arc<dyn Delivery>>,
3032 @@ -45,7 +45,7 @@ impl Worker {
3033 })
3034 }
3035
3036- pub async fn process(&mut self) -> Result<(), Error> {
3037+ pub async fn process(&mut self) -> Result<(), ServerError> {
3038 let mut ticker =
3039 tokio::time::interval_at(tokio::time::Instant::now(), Duration::from_millis(800));
3040