Commit
Author: Kevin Schoon [me@kevinschoon.com]
Hash: dbf889ce40d240ac8b8386ae358340ec28bbeb45
Timestamp: Fri, 30 Aug 2024 15:32:12 +0000 (3 months ago)

+332 -525 +/-7 browse
rewrite transport to handle pipelining on it's own
1diff --git a/maitred/src/error.rs b/maitred/src/error.rs
2index 1694437..6fc07a3 100644
3--- a/maitred/src/error.rs
4+++ b/maitred/src/error.rs
5 @@ -1,28 +1,10 @@
6- use std::string::FromUtf8Error;
7-
8- use smtp_proto::Error as SmtpError;
9- use url::ParseError;
10-
11-
12 /// Any fatal error that is encountered by the server that should cause it
13 /// to shutdown and stop processing connections.
14 #[derive(Debug, thiserror::Error)]
15 pub enum Error {
16- /// An unspecified internal error
17- #[error("Unspecified internal error: {0}")]
18- Internal(String),
19 /// An IO related error such as not being able to bind to a TCP socket
20 #[error("Io: {0}")]
21 Io(#[from] std::io::Error),
22- /// An error generated from the underlying SMTP protocol
23- #[error("Smtp failure: {0}")]
24- Smtp(#[from] SmtpError),
25- /// Inability to parse a URL
26- #[error("Failed to parse Url: {0}")]
27- UrlParsing(#[from] ParseError),
28- /// UTF8 related Error
29- #[error("Failed to read UTF8: {0}")]
30- Utf8(#[from] FromUtf8Error),
31 /// Session timeout
32 #[error("Client took too long to respond: {0}s")]
33 Timeout(u64),
34 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
35index c8b9614..8b20e4b 100644
36--- a/maitred/src/lib.rs
37+++ b/maitred/src/lib.rs
38 @@ -49,7 +49,6 @@ mod delivery;
39 mod error;
40 mod expand;
41 mod milter;
42- mod pipeline;
43 mod server;
44 mod session;
45 mod transport;
46 @@ -64,7 +63,6 @@ pub use expand::{Error as ExpansionError, Expansion, Func as ExpansionFunc};
47 pub use milter::{Error as MilterError, Milter};
48
49 pub use error::Error;
50- pub use pipeline::{Pipeline, Transaction};
51 pub use server::Server;
52 pub use session::{
53 Options as SessionOptions, Session, DEFAULT_CAPABILITIES, DEFAULT_GREETING,
54 @@ -137,45 +135,3 @@ macro_rules! smtp_response {
55 };
56 }
57 pub(crate) use smtp_response;
58-
59- /// Generate a Chunk that contains a single SMTP response.
60- #[macro_export]
61- macro_rules! smtp_chunk {
62- ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
63- Chunk(vec![Response::General(SmtpResponse::new(
64- $code,
65- $e1,
66- $e2,
67- $e3,
68- $name.to_string(),
69- ))])
70- };
71- }
72-
73- /// Generate a Chunk that contains a single successful response.
74- #[macro_export]
75- macro_rules! smtp_chunk_ok {
76- ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
77- Ok::<Chunk, Chunk>(Chunk(vec![Response::General(SmtpResponse::new(
78- $code,
79- $e1,
80- $e2,
81- $e3,
82- $name.to_string(),
83- ))]))
84- };
85- }
86-
87- /// Generate a Chunk that that contains a single error.
88- #[macro_export]
89- macro_rules! smtp_chunk_err {
90- ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
91- Err::<Chunk, Chunk>(Chunk(vec![Response::General(SmtpResponse::new(
92- $code,
93- $e1,
94- $e2,
95- $e3,
96- $name.to_string(),
97- ))]))
98- };
99- }
100 diff --git a/maitred/src/pipeline.rs b/maitred/src/pipeline.rs
101deleted file mode 100644
102index 353ed27..0000000
103--- a/maitred/src/pipeline.rs
104+++ /dev/null
105 @@ -1,212 +0,0 @@
106- use crate::session::Result as SessionResult;
107- use crate::{Chunk, Request};
108-
109- pub type Transaction = (Request<String>, SessionResult);
110-
111- fn flatten(history: &[Transaction]) -> Chunk {
112- Chunk(
113- history
114- .iter()
115- .map(|tx| tx.1.clone().unwrap_or_else(|e| e))
116- .fold(Vec::new(), |mut accm, chunk| {
117- accm.extend(chunk.0.clone());
118- accm
119- })
120- .into_iter()
121- .collect(),
122- )
123- }
124-
125- /// Pipeline chunks session request/responses into logical groups returning
126- /// a response only once a session is considered "completed".
127- #[derive(Default)]
128- pub struct Pipeline {
129- history: Vec<Transaction>,
130- }
131-
132- impl Pipeline {
133- /// Checks if the pipeline is within a data transaction (if the previous
134- /// command was DATA/BDAT).
135- fn within_tx(&self) -> bool {
136- self.history
137- .last()
138- .map(|tx| {
139- matches!(tx.0, Request::Data)
140- || matches!(
141- tx.0,
142- Request::Bdat {
143- chunk_size: _,
144- is_last: _
145- }
146- )
147- })
148- .is_some_and(|is_tx| is_tx)
149- }
150-
151- fn chunk(&mut self) -> Chunk {
152- let mail_from_ok = self.history.iter().fold(false, |ok, tx| {
153- if matches!(&tx.0, Request::Mail { from: _ }) {
154- return tx.1.is_ok();
155- }
156- ok
157- });
158- let rcpt_to_ok_count = self.history.iter().fold(0, |count, tx| {
159- if matches!(&tx.0, Request::Rcpt { to: _ }) && tx.1.is_ok() {
160- return count + 1;
161- }
162- count
163- });
164- let last_command = self
165- .history
166- .last()
167- .expect("to results called without history");
168- if last_command.1.is_ok() && mail_from_ok && rcpt_to_ok_count > 0 {
169- flatten(&self.history)
170- } else if !mail_from_ok || rcpt_to_ok_count <= 0 {
171- self.history.pop();
172- flatten(&self.history)
173- } else {
174- flatten(&self.history)
175- }
176- }
177-
178- /// Per RFC 2920:
179- /// Once the client SMTP has confirmed that support exists for the
180- /// pipelining extension, the client SMTP may then elect to transmit
181- /// groups of SMTP commands in batches without waiting for a response to
182- /// each individual command. In particular, the commands RSET, MAIL FROM,
183- /// SEND FROM, SOML FROM, SAML FROM, and RCPT TO can all appear anywhere
184- /// in a pipelined command group. The EHLO, DATA, VRFY, EXPN, TURN,
185- /// QUIT, and NOOP commands can only appear as the last command in a
186- /// group since their success or failure produces a change of state which
187- /// the client SMTP must accommodate. (NOOP is included in this group so
188- /// it can be used as a synchronization point.)
189- ///
190- /// Process the next session result in the pipeline and return a vector of
191- /// results if the command requires a server response. Returns true if
192- /// the process was successful or false if it contained fatal errors. All
193- /// results must be passed to the client.
194- pub fn process(&mut self, req: &Request<String>, res: &SessionResult) -> Chunk {
195- let is_data_tx = self.within_tx();
196- if is_data_tx {
197- // ignore the first data request
198- self.history.pop();
199- self.history.push((req.clone(), res.clone()));
200- } else {
201- self.history.push((req.clone(), res.clone()));
202- }
203- match req {
204- Request::Ehlo { host: _ } => {
205- self.history.clear();
206- res.clone().unwrap_or_else(|e| e)
207- }
208- Request::Lhlo { host: _ } => {
209- self.history.clear();
210- res.clone().unwrap_or_else(|e| e)
211- }
212- Request::Helo { host: _ } => {
213- self.history.clear();
214- res.clone().unwrap_or_else(|e| e)
215- }
216- Request::Mail { from: _ } => Chunk::default(),
217- Request::Rcpt { to: _ } => Chunk::default(),
218- Request::Bdat {
219- chunk_size: _,
220- is_last: _,
221- } => {
222- if is_data_tx {
223- let chunk = self.chunk();
224- self.history.clear();
225- chunk
226- } else {
227- Chunk::default()
228- }
229- }
230- Request::Auth {
231- mechanism: _,
232- initial_response: _,
233- } => todo!(),
234- Request::Noop { value: _ } => res.clone().unwrap_or_else(|e| e),
235- Request::Vrfy { value: _ } => todo!(),
236- Request::Expn { value: _ } => todo!(),
237- Request::Help { value: _ } => res.clone().unwrap_or_else(|e| e),
238- Request::Etrn { name: _ } => todo!(),
239- Request::Atrn { domains: _ } => todo!(),
240- Request::Burl { uri: _, is_last: _ } => todo!(),
241- Request::StartTls => todo!(),
242- Request::Data => {
243- if is_data_tx {
244- let chunk = self.chunk();
245- self.history.clear();
246- chunk
247- } else {
248- Chunk::default()
249- }
250- }
251- Request::Rset => {
252- self.history.clear();
253- res.clone().unwrap_or_else(|e| e)
254- }
255- Request::Quit => res.clone().unwrap_or_else(|e| e),
256- }
257- }
258- }
259-
260- #[cfg(test)]
261- mod test {
262-
263- use super::*;
264- use crate::{smtp_chunk_ok, transport::Response, Request};
265- use smtp_proto::Response as SmtpResponse;
266-
267- #[test]
268- pub fn test_pipeline_basic() {
269- let mut pipeline = Pipeline::default();
270- assert!(
271- pipeline
272- .process(
273- &Request::Helo {
274- host: "example.org".to_string(),
275- },
276- &smtp_chunk_ok!(200, 0, 0, 0, "OK")
277- )
278- .0
279- .len()
280- == 1
281- );
282- // batchable commands out of order
283- assert!(pipeline
284- .process(
285- &Request::Rcpt {
286- to: smtp_proto::RcptTo {
287- address: "baz@qux.com".to_string(),
288- ..Default::default()
289- },
290- },
291- &smtp_chunk_ok!(200, 0, 0, 0, "OK: baz@qux.com")
292- )
293- .0
294- .is_empty());
295- assert!(pipeline
296- .process(
297- &Request::Mail {
298- from: smtp_proto::MailFrom {
299- address: "fuu@bar.com".to_string(),
300- ..Default::default()
301- }
302- },
303- &smtp_chunk_ok!(200, 0, 0, 0, "OK: fuu@bar.com")
304- )
305- .0
306- .is_empty());
307-
308- // initialize a data request
309- assert!(pipeline
310- .process(&Request::Data {}, &smtp_chunk_ok!(200, 0, 0, 0, "OK"))
311- .0
312- .is_empty());
313- // simulate the end of a request
314- let result = pipeline.process(&Request::Data {}, &smtp_chunk_ok!(200, 0, 0, 0, "OK"));
315- assert!(result.0.len() == 3);
316- }
317- }
318 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
319index 50ef55f..2f98398 100644
320--- a/maitred/src/server.rs
321+++ b/maitred/src/server.rs
322 @@ -4,7 +4,6 @@ use std::rc::Rc;
323 use std::sync::Arc;
324 use std::time::Duration;
325
326- use bytes::Bytes;
327 use crossbeam_deque::Injector;
328 use crossbeam_deque::Stealer;
329 use crossbeam_deque::Worker as WorkQueue;
330 @@ -12,53 +11,32 @@ use futures::SinkExt;
331 use futures::StreamExt;
332 use mail_parser::Message;
333 use smtp_proto::Request;
334+ use tokio::net::TcpListener;
335 use tokio::sync::mpsc::Sender;
336 use tokio::sync::Mutex;
337 use tokio::task::JoinHandle;
338- use tokio::{net::TcpListener, time::timeout};
339+ use tokio::time::timeout;
340 use tokio_stream::{self as stream};
341 use tokio_util::codec::Framed;
342
343 use crate::error::Error;
344- use crate::pipeline::Pipeline;
345 use crate::session::Session;
346- use crate::transport::{Response, Transport};
347+ use crate::transport::Command;
348+ use crate::transport::Transport;
349 use crate::worker::{Packet, Worker};
350- use crate::{Chunk, Delivery, Milter};
351+ use crate::{Delivery, Milter, Response, SmtpResponse};
352
353 /// The default port the server will listen on if none was specified in it's
354 /// configuration options.
355 pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:2525";
356
357- // Maximum amount of time the server will wait for a command before closing
358- // the connection.
359+ /// Maximum amount of time the server will wait for a command before closing
360+ /// the connection.
361 const DEFAULT_GLOBAL_TIMEOUT_SECS: u64 = 300;
362
363- /// Apply pipelining if running in extended mode and configured to support it
364- struct ConditionalPipeline<'a> {
365- pub session: &'a Mutex<Session>,
366- pub pipeline: &'a mut Pipeline,
367- }
368-
369- impl ConditionalPipeline<'_> {
370- pub async fn apply(&mut self, req: &Request<String>, data: Option<&Bytes>) -> Chunk {
371- let mut session = self.session.lock().await;
372- let response = session.process(req, data).await;
373- if session.has_capability(smtp_proto::EXT_PIPELINING) && session.is_extended() {
374- self.pipeline.process(req, &response)
375- } else {
376- match response {
377- Ok(response) => {
378- tracing::debug!("Client response: {:?}", response);
379- response
380- }
381- Err(response) => {
382- tracing::warn!("Client error: {:?}", response);
383- response
384- }
385- }
386- }
387- }
388+ /// check if the final command is QUIT
389+ fn is_quit(reqs: &[Request<String>]) -> bool {
390+ reqs.last().is_some_and(|req| matches!(req, Request::Quit))
391 }
392
393 /// Server implements everything that is required to run an SMTP server by
394 @@ -162,43 +140,72 @@ where
395 async fn process<T>(
396 &self,
397 mut framed: Framed<T, Transport>,
398- pipeline: &mut ConditionalPipeline<'_>,
399- greeting: Response<String>,
400+ msg_queue: Arc<Injector<Packet>>,
401 ) -> Result<(), Error>
402 where
403 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
404 {
405+ let mut session = Session::default();
406+ if let Some(opts) = &self.options {
407+ session = session.with_options(opts.clone());
408+ }
409+
410+ let greeting = session.greeting();
411+ let mut session = session;
412 // send inital server greeting
413 framed.send(greeting).await?;
414
415- 'outer: loop {
416+ let mut shutdown = false;
417+ 'outer: while !shutdown {
418 let frame = timeout(self.global_timeout, framed.next()).await;
419 match frame {
420- Ok(request) => {
421- if let Some(command) = request {
422- let mut finished = false;
423- match command {
424- Ok(command) => {
425- if matches!(command.0, Request::Quit) {
426- finished = true;
427- }
428- let responses =
429- pipeline.apply(&command.0, command.1.as_ref()).await;
430- for response in responses.0.into_iter() {
431+ Ok(Some(Ok(Command::Requests(commands)))) => {
432+ shutdown = is_quit(commands.as_slice());
433+ for command in commands {
434+ match session.process(&command).await {
435+ Ok(responses) => {
436+ for response in responses {
437 framed.send(response).await?;
438 }
439- if finished {
440- break 'outer;
441- }
442 }
443- Err(err) => {
444- tracing::warn!("Socket closed with error: {:?}", err);
445- return Err(err);
446+ Err(e) => {
447+ tracing::warn!("Client error: {:?}", e);
448+ framed.send(e).await?;
449 }
450 }
451- } else {
452- break 'outer;
453+ }
454+ }
455+ Ok(Some(Ok(Command::Payload(payload)))) => {
456+ match session.handle_data(&payload).await {
457+ Ok(responses) => {
458+ for response in responses {
459+ framed.send(response).await?;
460+ }
461+ msg_queue.push(Packet::from(&session));
462+ }
463+ Err(response) => {
464+ tracing::warn!("Error handling message payload: {:?}", response);
465+ framed.send(response).await?;
466+ }
467+ }
468+ }
469+ Ok(Some(Err(err))) => {
470+ tracing::error!("Internal server error: {}", err);
471+ let response = match err {
472+ crate::transport::Error::PipelineNotEnabled => {
473+ crate::smtp_response!(500, 0, 0, 0, "Pipelining is not enabled")
474+ }
475+ crate::transport::Error::Smtp(e) => {
476+ crate::smtp_response!(500, 0, 0, 0, e.to_string())
477+ }
478+ // IO Errors considered fatal for the entire session
479+ crate::transport::Error::Io(e) => return Err(Error::Io(e)),
480 };
481+ framed.send(response).await?;
482+ }
483+ Ok(None) => {
484+ tracing::info!("Client connection closing");
485+ break 'outer;
486 }
487 Err(timeout) => {
488 tracing::warn!("Client connection exceeded: {:?}", self.global_timeout);
489 @@ -209,7 +216,6 @@ where
490 }
491 }
492 }
493- tracing::info!("Connection closed");
494 Ok(())
495 }
496
497 @@ -268,23 +274,11 @@ where
498 let (socket, _) = listener.accept().await.unwrap();
499 let addr = socket.local_addr()?;
500 tracing::info!("Accepted connection on: {:?}", addr);
501- let framed = Framed::new(socket, Transport::default());
502- let mut session = Session::default();
503- if let Some(opts) = &self.options {
504- session = session.with_options(opts.clone());
505- }
506+ let framed = Framed::new(socket, Transport::default().pipelining(true));
507
508- let greeting = session.greeting();
509- let session = Mutex::new(session);
510- let mut pipelined = ConditionalPipeline {
511- session: &session,
512- pipeline: &mut Pipeline::default(),
513- };
514-
515- match self.process(framed, &mut pipelined, greeting).await {
516+ match self.process(framed, global_queue.clone()).await {
517 Ok(_) => {
518- let session = session.into_inner();
519- global_queue.push(session.into());
520+ tracing::info!("Client connection finished normally");
521 }
522 Err(err) => {
523 tracing::warn!("Client encountered an error: {:?}", err);
524 @@ -357,7 +351,7 @@ mod test {
525 }
526
527 #[tokio::test]
528- async fn test_server_process() {
529+ async fn test_server_pipelined() {
530 let stream = FakeStream {
531 buffer: vec![
532 "HELO example.org\r\n".into(),
533 @@ -377,24 +371,14 @@ mod test {
534 Box::pin(async move { Ok(()) })
535 }));
536 let framed = Framed::new(stream, Transport::default());
537- let session = Session::default();
538- let greeting = session.greeting();
539- let session = Mutex::new(session);
540-
541- let mut pipelined = ConditionalPipeline {
542- session: &session,
543- pipeline: &mut Pipeline::default(),
544- };
545- server
546- .process(framed, &mut pipelined, greeting)
547- .await
548- .unwrap();
549- let session = session.lock().await;
550- assert!(session
551+ let global_queue = Arc::new(Injector::<Packet>::new());
552+ server.process(framed, global_queue.clone()).await.unwrap();
553+ let packet = global_queue.steal().success().unwrap();
554+ assert!(packet
555 .mail_from
556 .as_ref()
557 .is_some_and(|mail_from| mail_from.email() == "fuu@bar.com"));
558- assert!(session.rcpt_to.as_ref().is_some_and(|rcpts| rcpts
559+ assert!(packet.rcpt_to.as_ref().is_some_and(|rcpts| rcpts
560 .first()
561 .is_some_and(|rcpt_to| rcpt_to.email() == "baz@qux.com")));
562 }
563 diff --git a/maitred/src/session.rs b/maitred/src/session.rs
564index 7f898e9..c78c92c 100644
565--- a/maitred/src/session.rs
566+++ b/maitred/src/session.rs
567 @@ -11,10 +11,9 @@ use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
568 use url::Host;
569
570 use crate::expand::Expansion;
571+ use crate::smtp_response;
572 use crate::transport::Response;
573 use crate::verify::Verify;
574- use crate::{smtp_chunk, smtp_chunk_err, smtp_chunk_ok};
575- use crate::{smtp_response, Chunk};
576
577 /// Default help banner returned from a HELP command without any parameters
578 pub const DEFAULT_HELP_BANNER: &str = r#"
579 @@ -42,12 +41,12 @@ pub const DEFAULT_GREETING: &str = "Maitred ESMTP Server";
580 /// Default SMTP capabilities advertised by the server
581 pub const DEFAULT_CAPABILITIES: u32 = smtp_proto::EXT_SIZE
582 | smtp_proto::EXT_ENHANCED_STATUS_CODES
583- // | smtp_proto::EXT_PIPELINING FIXME broken in swaks
584+ | smtp_proto::EXT_PIPELINING
585 | smtp_proto::EXT_8BIT_MIME;
586
587 /// Result generated as part of an SMTP session, an Err indicates a session
588 /// level error that will be returned to the client.
589- pub type Result = StdResult<Chunk, Chunk>;
590+ pub type Result = StdResult<Vec<Response<String>>, Response<String>>;
591
592 enum Mode {
593 Legacy,
594 @@ -192,17 +191,23 @@ impl Session {
595 }
596
597 /// Ensure that the session has been initialized otherwise return an error
598- fn check_initialized(&self) -> StdResult<(), Chunk> {
599+ fn check_initialized(&self) -> StdResult<(), Response<String>> {
600 if self.initialized.is_none() {
601- return Err(smtp_chunk!(500, 0, 0, 0, "It's polite to say EHLO first"));
602+ return Err(smtp_response!(
603+ 500,
604+ 0,
605+ 0,
606+ 0,
607+ "It's polite to say EHLO first"
608+ ));
609 }
610 Ok(())
611 }
612
613 /// checks if 8BITMIME is supported
614- fn check_body(&self, body: &[u8]) -> StdResult<(), Chunk> {
615+ fn check_body(&self, body: &[u8]) -> StdResult<(), Response<String>> {
616 if !self.has_capability(smtp_proto::EXT_8BIT_MIME) && !body.is_ascii() {
617- return Err(smtp_chunk!(
618+ return Err(smtp_response!(
619 500,
620 0,
621 0,
622 @@ -213,6 +218,60 @@ impl Session {
623 Ok(())
624 }
625
626+ pub async fn handle_data(&mut self, data: &Bytes) -> Result {
627+ self.check_initialized()?;
628+ let transfer_mode = self
629+ .data_transfer
630+ .as_ref()
631+ .expect("transfer is not initalized");
632+ match transfer_mode {
633+ DataTransfer::Data => {
634+ let message_payload = data.to_vec();
635+ self.check_body(&message_payload)?;
636+ let parser = MessageParser::new();
637+ match parser.parse(&message_payload) {
638+ Some(msg) => {
639+ self.body = Some(msg.into_owned());
640+ self.data_transfer = None;
641+ Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
642+ }
643+ None => {
644+ self.data_transfer = None;
645+ Ok(vec![smtp_response!(
646+ 500,
647+ 0,
648+ 0,
649+ 0,
650+ "Cannot parse message payload".to_string()
651+ )])
652+ }
653+ }
654+ }
655+ DataTransfer::Bdat => {
656+ let message_payload = data.to_vec();
657+ self.check_body(&message_payload)?;
658+ let parser = MessageParser::new();
659+ match parser.parse(&message_payload) {
660+ Some(msg) => {
661+ self.body = Some(msg.into_owned());
662+ self.data_transfer = None;
663+ Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
664+ }
665+ None => {
666+ self.data_transfer = None;
667+ Ok(vec![smtp_response!(
668+ 500,
669+ 0,
670+ 0,
671+ 0,
672+ "Cannot parse message payload".to_string()
673+ )])
674+ }
675+ }
676+ }
677+ }
678+ }
679+
680 /// Statefully process the SMTP command with optional data payload, any
681 /// error returned is passed back to the caller.
682 /// NOTE:
683 @@ -221,35 +280,50 @@ impl Session {
684 /// indicate that the process is starting and the second one contains the
685 /// parsed bytes from the transfer.
686 /// FIXME: Not at all reasonable yet
687- pub async fn process(&mut self, req: &Request<String>, data: Option<&Bytes>) -> Result {
688+ pub async fn process(&mut self, req: &Request<String>) -> Result {
689 match req {
690 Request::Ehlo { host } => {
691- self.hostname =
692- Some(Host::parse(host).map_err(|e| smtp_chunk!(500, 0, 0, 0, e.to_string()))?);
693+ self.hostname = Some(
694+ Host::parse(host).map_err(|e| smtp_response!(500, 0, 0, 0, e.to_string()))?,
695+ );
696 self.reset();
697 self.initialized = Some(Mode::Extended);
698 let mut resp = EhloResponse::new(format!("Hello {}", host));
699 resp.capabilities = self.opts.capabilities;
700 resp.size = self.opts.maximum_size as usize;
701- Ok(Chunk(vec![Response::Ehlo(resp)]))
702+ Ok(vec![Response::Ehlo(resp)])
703 }
704 Request::Lhlo { host } => {
705- self.hostname = Some(Host::parse(host).map_err(|e| smtp_chunk!(500, 0, 0, 0, e))?);
706+ self.hostname =
707+ Some(Host::parse(host).map_err(|e| smtp_response!(500, 0, 0, 0, e))?);
708 self.reset();
709 self.initialized = Some(Mode::Legacy);
710- smtp_chunk_ok!(250, 0, 0, 0, format!("Hello {}", host))
711+ Ok(vec![smtp_response!(
712+ 250,
713+ 0,
714+ 0,
715+ 0,
716+ format!("Hello {}", host)
717+ )])
718 }
719 Request::Helo { host } => {
720- self.hostname =
721- Some(Host::parse(host).map_err(|e| smtp_chunk!(500, 0, 0, 0, e.to_string()))?);
722+ self.hostname = Some(
723+ Host::parse(host).map_err(|e| smtp_response!(500, 0, 0, 0, e.to_string()))?,
724+ );
725 self.reset();
726 self.initialized = Some(Mode::Legacy);
727- smtp_chunk_ok!(250, 0, 0, 0, format!("Hello {}", host))
728+ Ok(vec![smtp_response!(
729+ 250,
730+ 0,
731+ 0,
732+ 0,
733+ format!("Hello {}", host)
734+ )])
735 }
736 Request::Mail { from } => {
737 self.check_initialized()?;
738 let mail_from = EmailAddress::from_str(from.address.as_str()).map_err(|e| {
739- smtp_chunk!(
740+ smtp_response!(
741 500,
742 0,
743 0,
744 @@ -258,59 +332,34 @@ impl Session {
745 )
746 })?;
747 self.mail_from = Some(mail_from.clone());
748- smtp_chunk_ok!(250, 0, 0, 0, "OK")
749+ Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
750 }
751 Request::Rcpt { to } => {
752 self.check_initialized()?;
753 let rcpt_to = EmailAddress::from_str(to.address.as_str()).map_err(|e| {
754- smtp_chunk!(500, 0, 0, 0, format!("cannot parse: {} {}", to.address, e))
755+ smtp_response!(500, 0, 0, 0, format!("cannot parse: {} {}", to.address, e))
756 })?;
757 if let Some(ref mut rcpts) = self.rcpt_to {
758 rcpts.push(rcpt_to.clone());
759 } else {
760 self.rcpt_to = Some(vec![rcpt_to.clone()]);
761 }
762- smtp_chunk_ok!(250, 0, 0, 0, "OK")
763+ Ok(vec![smtp_response!(250, 0, 0, 0, "OK")])
764 }
765 Request::Bdat {
766 chunk_size: _,
767 is_last: _,
768 } => {
769 self.check_initialized()?;
770- if let Some(transfer_mode) = &self.data_transfer {
771- match transfer_mode {
772- DataTransfer::Data => {
773- panic!("Transfer mode changed from DATA to BDAT")
774- }
775- DataTransfer::Bdat => {
776- let message_payload =
777- data.expect("data returned without a payload").to_vec();
778- self.check_body(&message_payload)?;
779- let parser = MessageParser::new();
780- match parser.parse(&message_payload) {
781- Some(msg) => {
782- self.body = Some(msg.into_owned());
783- self.data_transfer = None;
784- smtp_chunk_ok!(250, 0, 0, 0, "OK")
785- }
786- None => {
787- self.data_transfer = None;
788- smtp_chunk_err!(
789- 500,
790- 0,
791- 0,
792- 0,
793- "Cannot parse message payload".to_string()
794- )
795- }
796- }
797- }
798- }
799- } else {
800- tracing::info!("Initializing data transfer mode");
801- self.data_transfer = Some(DataTransfer::Bdat);
802- smtp_chunk_ok!(354, 0, 0, 0, "Starting BDAT data transfer".to_string())
803- }
804+ tracing::info!("Initializing data transfer mode");
805+ self.data_transfer = Some(DataTransfer::Bdat);
806+ Ok(vec![smtp_response!(
807+ 354,
808+ 0,
809+ 0,
810+ 0,
811+ "Starting BDAT data transfer".to_string()
812+ )])
813 }
814 Request::Auth {
815 mechanism,
816 @@ -318,21 +367,19 @@ impl Session {
817 } => todo!(),
818 Request::Noop { value: _ } => {
819 self.check_initialized()?;
820- smtp_chunk_ok!(250, 0, 0, 0, "OK".to_string())
821+ Ok(vec![smtp_response!(250, 0, 0, 0, "OK".to_string())])
822 }
823 Request::Vrfy { value } => {
824 if let Some(verifier) = &self.opts.verification {
825 let address = EmailAddress::from_str(value.as_str()).map_err(|e| {
826- smtp_chunk!(500, 0, 0, 0, format!("cannot parse: {} {}", value, e))
827+ smtp_response!(500, 0, 0, 0, format!("cannot parse: {} {}", value, e))
828 })?;
829 match verifier.verify(&address).await {
830- Ok(_) => {
831- smtp_chunk_ok!(250, 0, 0, 0, "OK".to_string())
832- }
833- Err(e) => Err(e.into()),
834+ Ok(_) => Ok(vec![smtp_response!(250, 0, 0, 0, "OK".to_string())]),
835+ Err(e) => Err(smtp_response!(500, 0, 0, 0, e.to_string())),
836 }
837 } else {
838- smtp_chunk_err!(500, 0, 0, 0, "No such address")
839+ Err(smtp_response!(500, 0, 0, 0, "No such address"))
840 }
841 }
842 Request::Expn { value } => {
843 @@ -345,26 +392,32 @@ impl Session {
844 .iter()
845 .map(|addr| smtp_response!(250, 0, 0, 0, addr.to_string())),
846 );
847- Ok(Chunk(result))
848+ Ok(result)
849 }
850- Err(e) => Err(e.into()),
851+ Err(e) => Err(smtp_response!(500, 0, 0, 0, e.to_string())),
852 }
853 } else {
854- smtp_chunk_err!(500, 0, 0, 0, "Server does not support EXPN")
855+ Err(smtp_response!(500, 0, 0, 0, "Server does not support EXPN"))
856 }
857 }
858 Request::Help { value } => {
859 self.check_initialized()?;
860 if value.is_empty() {
861- smtp_chunk_ok!(250, 0, 0, 0, self.opts.help_banner.to_string())
862- } else {
863- smtp_chunk_ok!(
864+ Ok(vec![smtp_response!(
865 250,
866 0,
867 0,
868 0,
869+ self.opts.help_banner.to_string()
870+ )])
871+ } else {
872+ Err(smtp_response!(
873+ 500,
874+ 0,
875+ 0,
876+ 0,
877 format!("Help for {} is not currently available", value)
878- )
879+ ))
880 }
881 }
882 Request::Etrn { name } => todo!(),
883 @@ -373,53 +426,22 @@ impl Session {
884 Request::StartTls => todo!(),
885 Request::Data => {
886 self.check_initialized()?;
887- if let Some(transfer_mode) = &self.data_transfer {
888- match transfer_mode {
889- DataTransfer::Bdat => {
890- panic!("Transfer mode changed from BDAT to DATA")
891- }
892- DataTransfer::Data => {
893- let message_payload =
894- data.expect("data returned without a payload").to_vec();
895- self.check_body(&message_payload)?;
896- let parser = MessageParser::new();
897- match parser.parse(&message_payload) {
898- Some(msg) => {
899- self.body = Some(msg.into_owned());
900- self.data_transfer = None;
901- smtp_chunk_ok!(250, 0, 0, 0, "OK")
902- }
903- None => {
904- self.data_transfer = None;
905- smtp_chunk_err!(
906- 500,
907- 0,
908- 0,
909- 0,
910- "Cannot parse message payload".to_string()
911- )
912- }
913- }
914- }
915- }
916- } else {
917- tracing::info!("Initializing data transfer mode");
918- self.data_transfer = Some(DataTransfer::Data);
919- smtp_chunk_ok!(
920- 354,
921- 0,
922- 0,
923- 0,
924- "Reading data input, end the message with <CRLF>.<CRLF>".to_string()
925- )
926- }
927+ tracing::info!("Initializing data transfer mode");
928+ self.data_transfer = Some(DataTransfer::Data);
929+ Ok(vec![smtp_response!(
930+ 354,
931+ 0,
932+ 0,
933+ 0,
934+ "Reading data input, end the message with <CRLF>.<CRLF>".to_string()
935+ )])
936 }
937 Request::Rset => {
938 self.check_initialized()?;
939 self.reset();
940- smtp_chunk_ok!(200, 0, 0, 0, "".to_string())
941+ Ok(vec![smtp_response!(200, 0, 0, 0, "".to_string())])
942 }
943- Request::Quit => smtp_chunk_ok!(221, 0, 0, 0, "Ciao!".to_string()),
944+ Request::Quit => Ok(vec![smtp_response!(221, 0, 0, 0, "Ciao!".to_string())]),
945 }
946 }
947 }
948 @@ -447,7 +469,11 @@ mod test {
949 async move {
950 let mut session = session.lock().await;
951 println!("Running command {}/{}", i, commands.len());
952- let response = session.process(&command.request, command.payload.as_ref()).await;
953+ let response = if let Some(payload) = &command.payload {
954+ session.handle_data(payload).await
955+ } else {
956+ session.process(&command.request).await
957+ };
958 println!("Response: {:?}", response);
959 match response {
960 Ok(actual_response) => {
961 @@ -496,12 +522,18 @@ mod test {
962 host: EXAMPLE_HOSTNAME.to_string(),
963 },
964 payload: None,
965- expected: smtp_chunk_ok!(250, 0, 0, 0, String::from("Hello example.org")),
966+ expected: Ok(vec![smtp_response!(
967+ 250,
968+ 0,
969+ 0,
970+ 0,
971+ String::from("Hello example.org")
972+ )]),
973 },
974 TestCase {
975 request: Request::Quit {},
976 payload: None,
977- expected: smtp_chunk_ok!(221, 0, 0, 0, String::from("Ciao!")),
978+ expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]),
979 },
980 ];
981 let session = Mutex::new(Session::default());
982 @@ -524,7 +556,13 @@ mod test {
983 },
984 },
985 payload: None,
986- expected: smtp_chunk_err!(500, 0, 0, 0, String::from("It's polite to say EHLO first")),
987+ expected: Err(smtp_response!(
988+ 500,
989+ 0,
990+ 0,
991+ 0,
992+ String::from("It's polite to say EHLO first")
993+ )),
994 }];
995 let session = Mutex::new(
996 Session::default()
997 @@ -541,23 +579,29 @@ mod test {
998 host: EXAMPLE_HOSTNAME.to_string(),
999 },
1000 payload: None,
1001- expected: smtp_chunk_ok!(250, 0, 0, 0, String::from("Hello example.org")),
1002+ expected: Ok(vec![smtp_response!(
1003+ 250,
1004+ 0,
1005+ 0,
1006+ 0,
1007+ String::from("Hello example.org")
1008+ )]),
1009 },
1010 TestCase {
1011 request: Request::Expn {
1012 value: "mailing-list".to_string(),
1013 },
1014 payload: None,
1015- expected: Ok(Chunk(vec![
1016+ expected: Ok(vec![
1017 smtp_response!(250, 0, 0, 0, "OK"),
1018 smtp_response!(250, 0, 0, 0, "Fuu <fuu@bar.com>"),
1019 smtp_response!(250, 0, 0, 0, "Baz <baz@qux.com>"),
1020- ])),
1021+ ]),
1022 },
1023 TestCase {
1024 request: Request::Quit {},
1025 payload: None,
1026- expected: smtp_chunk_ok!(221, 0, 0, 0, String::from("Ciao!")),
1027+ expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]),
1028 },
1029 ];
1030 let session = Mutex::new(
1031 @@ -593,19 +637,25 @@ mod test {
1032 host: EXAMPLE_HOSTNAME.to_string(),
1033 },
1034 payload: None,
1035- expected: smtp_chunk_ok!(250, 0, 0, 0, String::from("Hello example.org")),
1036+ expected: Ok(vec![smtp_response!(
1037+ 250,
1038+ 0,
1039+ 0,
1040+ 0,
1041+ String::from("Hello example.org")
1042+ )]),
1043 },
1044 TestCase {
1045 request: Request::Vrfy {
1046 value: "Fuu <bar@baz.com>".to_string(),
1047 },
1048 payload: None,
1049- expected: Ok(Chunk(vec![smtp_response!(250, 0, 0, 0, "OK")])),
1050+ expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
1051 },
1052 TestCase {
1053 request: Request::Quit {},
1054 payload: None,
1055- expected: smtp_chunk_ok!(221, 0, 0, 0, String::from("Ciao!")),
1056+ expected: Ok(vec![smtp_response!(221, 0, 0, 0, String::from("Ciao!"))]),
1057 },
1058 ];
1059 let session = Mutex::new(
1060 @@ -641,18 +691,24 @@ mod test {
1061 host: EXAMPLE_HOSTNAME.to_string(),
1062 },
1063 payload: None,
1064- expected: smtp_chunk_ok!(250, 0, 0, 0, String::from("Hello example.org")),
1065+ expected: Ok(vec![smtp_response!(
1066+ 250,
1067+ 0,
1068+ 0,
1069+ 0,
1070+ String::from("Hello example.org")
1071+ )]),
1072 },
1073 TestCase {
1074 request: Request::Data {},
1075 payload: None,
1076- expected: smtp_chunk_ok!(
1077+ expected: Ok(vec![smtp_response!(
1078 354,
1079 0,
1080 0,
1081 0,
1082 "Reading data input, end the message with <CRLF>.<CRLF>"
1083- ),
1084+ )]),
1085 },
1086 TestCase {
1087 request: Request::Data {},
1088 @@ -662,13 +718,13 @@ mod test {
1089 "#
1090 .as_bytes(),
1091 )),
1092- expected: smtp_chunk_err!(
1093+ expected: Err(smtp_response!(
1094 500,
1095 0,
1096 0,
1097 0,
1098 "Non ascii characters found in message body"
1099- ),
1100+ )),
1101 },
1102 // upgrade the connection to extended mode
1103 TestCase {
1104 @@ -676,18 +732,18 @@ mod test {
1105 host: EXAMPLE_HOSTNAME.to_string(),
1106 },
1107 payload: None,
1108- expected: Ok(Chunk(vec![Response::Ehlo(expected_ehlo_response)])),
1109+ expected: Ok(vec![Response::Ehlo(expected_ehlo_response)]),
1110 },
1111 TestCase {
1112 request: Request::Data {},
1113 payload: None,
1114- expected: smtp_chunk_ok!(
1115+ expected: Ok(vec![smtp_response!(
1116 354,
1117 0,
1118 0,
1119 0,
1120 "Reading data input, end the message with <CRLF>.<CRLF>"
1121- ),
1122+ )]),
1123 },
1124 TestCase {
1125 request: Request::Data {},
1126 @@ -697,7 +753,7 @@ mod test {
1127 "#
1128 .as_bytes(),
1129 )),
1130- expected: smtp_chunk_ok!(250, 0, 0, 0, "OK"),
1131+ expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
1132 },
1133 ];
1134 let session = Mutex::new(
1135 @@ -719,7 +775,7 @@ mod test {
1136 host: EXAMPLE_HOSTNAME.to_string(),
1137 },
1138 payload: None,
1139- expected: smtp_chunk_ok!(250, 0, 0, 0, "Hello example.org"),
1140+ expected: Ok(vec![smtp_response!(250, 0, 0, 0, "Hello example.org")]),
1141 },
1142 TestCase {
1143 request: Request::Mail {
1144 @@ -729,7 +785,7 @@ mod test {
1145 },
1146 },
1147 payload: None,
1148- expected: smtp_chunk_ok!(250, 0, 0, 0, "OK"),
1149+ expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
1150 },
1151 TestCase {
1152 request: Request::Rcpt {
1153 @@ -739,19 +795,19 @@ mod test {
1154 },
1155 },
1156 payload: None,
1157- expected: smtp_chunk_ok!(250, 0, 0, 0, "OK"),
1158+ expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
1159 },
1160 // initiate data transfer
1161 TestCase {
1162 request: Request::Data {},
1163 payload: None,
1164- expected: smtp_chunk_ok!(
1165+ expected: Ok(vec![smtp_response!(
1166 354,
1167 0,
1168 0,
1169 0,
1170 "Reading data input, end the message with <CRLF>.<CRLF>"
1171- ),
1172+ )]),
1173 },
1174 // send the actual payload
1175 TestCase {
1176 @@ -765,7 +821,7 @@ Note that it doesn't end with a "." since that parsing happens as part of the
1177 transport rather than the session.
1178 "#,
1179 )),
1180- expected: smtp_chunk_ok!(250, 0, 0, 0, "OK"),
1181+ expected: Ok(vec![smtp_response!(250, 0, 0, 0, "OK")]),
1182 },
1183 ];
1184 let session = Mutex::new(
1185 diff --git a/maitred/src/transport.rs b/maitred/src/transport.rs
1186index a624a84..a01704a 100644
1187--- a/maitred/src/transport.rs
1188+++ b/maitred/src/transport.rs
1189 @@ -2,9 +2,25 @@ use std::{fmt::Display, io::Write};
1190
1191 use bytes::{Bytes, BytesMut};
1192 use smtp_proto::request::receiver::{BdatReceiver, DataReceiver, RequestReceiver};
1193+ use smtp_proto::Error as SmtpError;
1194 pub use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
1195 use tokio_util::codec::{Decoder, Encoder};
1196
1197+ #[derive(Debug, thiserror::Error)]
1198+ pub enum Error {
1199+ /// Returned when a client attempts to send multiple commands sequentially
1200+ /// to the server without waiting for a response but piplining isn't
1201+ /// enabled.
1202+ #[error("Piplining is not enabled")]
1203+ PipelineNotEnabled,
1204+ /// An error generated from the underlying SMTP protocol
1205+ #[error("Smtp failure: {0}")]
1206+ Smtp(#[from] SmtpError),
1207+ /// An IO related error such as not being able to bind to a TCP socket
1208+ #[error("Io: {0}")]
1209+ Io(#[from] std::io::Error),
1210+ }
1211+
1212 #[derive(Debug, Clone)]
1213 pub enum Response<T>
1214 where
1215 @@ -63,20 +79,34 @@ pub(crate) enum Receiver {
1216
1217 /// Command from the client with an optional attached payload.
1218 #[derive(Debug)]
1219- pub(crate) struct Command(pub Request<String>, pub Option<Bytes>);
1220+ pub(crate) enum Command {
1221+ Requests(Vec<Request<String>>),
1222+ Payload(Bytes),
1223+ }
1224
1225 impl Display for Command {
1226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1227- write!(f, "{:?}", self.0)
1228+ match self {
1229+ Command::Requests(requests) => write!(f, "{:?}", requests),
1230+ Command::Payload(payload) => write!(f, "Bytes ({})", payload.len()),
1231+ }
1232 }
1233 }
1234
1235- /// SMTP Transport
1236 #[derive(Default)]
1237 pub(crate) struct Transport {
1238 receiver: Option<Box<Receiver>>,
1239 prev: Option<Request<String>>,
1240 buf: Vec<u8>,
1241+ pipelining: bool,
1242+ }
1243+
1244+ impl Transport {
1245+ /// If the transport should allow piplining commands
1246+ pub fn pipelining(mut self, enabled: bool) -> Self {
1247+ self.pipelining = enabled;
1248+ self
1249+ }
1250 }
1251
1252 impl Encoder<Response<String>> for Transport {
1253 @@ -97,12 +127,14 @@ impl Encoder<Response<String>> for Transport {
1254
1255 impl Decoder for Transport {
1256 type Item = Command;
1257- type Error = crate::Error;
1258+ type Error = Error;
1259
1260 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1261 if src.is_empty() {
1262+ tracing::debug!("Empty command received");
1263 return Ok(None);
1264 }
1265+
1266 if let Some(rec) = self.receiver.as_mut() {
1267 let chunk_size = src.len();
1268 tracing::debug!("Reading {} bytes of data stream", chunk_size);
1269 @@ -114,13 +146,7 @@ impl Decoder for Transport {
1270 let payload = Bytes::copy_from_slice(&self.buf);
1271 self.buf.clear();
1272 self.receiver = None;
1273- return Ok(Some(Command(
1274- self.prev
1275- .as_ref()
1276- .expect("missing previous command")
1277- .clone(),
1278- Some(payload),
1279- )));
1280+ return Ok(Some(Command::Payload(payload)));
1281 } else {
1282 return Ok(None);
1283 }
1284 @@ -132,13 +158,7 @@ impl Decoder for Transport {
1285 let payload = Bytes::copy_from_slice(&self.buf);
1286 self.buf.clear();
1287 self.receiver = None;
1288- return Ok(Some(Command(
1289- self.prev
1290- .as_ref()
1291- .expect("missing previous command")
1292- .clone(),
1293- Some(payload),
1294- )));
1295+ return Ok(Some(Command::Payload(payload)));
1296 } else {
1297 return Ok(None);
1298 }
1299 @@ -147,28 +167,49 @@ impl Decoder for Transport {
1300 };
1301
1302 let mut r = RequestReceiver::default();
1303- let buf = src.split_to(src.len());
1304- let request = r.ingest(&mut buf.iter(), buf.to_vec().as_slice())?;
1305- self.prev = Some(request.clone());
1306- match request {
1307+ let mut requests: Vec<Request<String>> = Vec::new();
1308+ let mut iter = src.iter();
1309+ 'outer: loop {
1310+ match r.ingest(&mut iter, src) {
1311+ Ok(request) => {
1312+ if !requests.is_empty() && !self.pipelining {
1313+ return Err(Error::PipelineNotEnabled)
1314+ }
1315+ requests.push(request);
1316+ }
1317+ Err(err) => {
1318+ if matches!(err, smtp_proto::Error::NeedsMoreData { bytes_left: _ }) {
1319+ break 'outer;
1320+ } else {
1321+ return Err(Error::Smtp(err));
1322+ }
1323+ }
1324+ }
1325+ }
1326+
1327+ src.clear();
1328+
1329+ let last = requests.last().expect("No data parsed");
1330+ match last {
1331 Request::Bdat {
1332 chunk_size,
1333 is_last,
1334 } => {
1335 tracing::info!("Starting binary data transfer");
1336 self.receiver = Some(Box::new(Receiver::Bdat(BdatReceiver::new(
1337- chunk_size, is_last,
1338+ *chunk_size,
1339+ *is_last,
1340 ))));
1341 self.buf.clear();
1342- Ok(Some(Command(request, None)))
1343+ Ok(Some(Command::Requests(requests)))
1344 }
1345 Request::Data => {
1346 tracing::info!("Starting data transfer");
1347 self.receiver = Some(Box::new(Receiver::Data(DataReceiver::new())));
1348 self.buf.clear();
1349- Ok(Some(Command(request, None)))
1350+ Ok(Some(Command::Requests(requests)))
1351 }
1352- _ => Ok(Some(Command(request, None))),
1353+ _ => Ok(Some(Command::Requests(requests))),
1354 }
1355 }
1356 }
1357 diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs
1358index 6df7eef..7865808 100644
1359--- a/maitred/src/worker.rs
1360+++ b/maitred/src/worker.rs
1361 @@ -21,8 +21,8 @@ pub(crate) struct Packet {
1362 pub hostname: Option<Host>,
1363 }
1364
1365- impl From<Session> for Packet {
1366- fn from(value: Session) -> Self {
1367+ impl From<&Session> for Packet {
1368+ fn from(value: &Session) -> Self {
1369 Packet {
1370 body: value.body.clone(),
1371 mail_from: value.mail_from.clone(),