Author: Kevin Schoon [me@kevinschoon.com]
Hash: c15bd7e7013203568d9bcb6df34174eb54e680f2
Timestamp: Fri, 02 Aug 2024 22:56:39 +0000 (2 months ago)

+277 -50 +/-6 browse
implement pipeline module
1diff --git a/README.md b/README.md
2index ea546eb..798d858 100644
3--- a/README.md
4+++ b/README.md
5 @@ -49,7 +49,7 @@ for _absolutely nothing_ that is important.
6 | Name | Status | RFC |
7 |-----------------------|----------|-------------------------------|
8 | SIZE | TODO | [RFC1870](rfcs/rfc1870.txt) |
9- | PIPELINING | TODO | [RFC2920](rfcs/rc2920.txt) |
10+ | PIPELINING | TODO | [RFC2920](rfcs/rfc2920.txt) |
11 | 8BITMIME | TODO | [RFC6152](rfcs/rfc6152.txt) |
12 | ENHANCED STATUS CODES | TODO | [RFC2920](rfcs/rfc3463.txt) |
13 | SMTPUTF8 | TODO | [RFC6531](rfcs/rfc6531.txt) |
14 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
15index 6da0b2e..18eac88 100644
16--- a/maitred/src/lib.rs
17+++ b/maitred/src/lib.rs
18 @@ -1,11 +1,50 @@
19 mod error;
20+ mod pipeline;
21 mod server;
22 mod session;
23 mod transport;
24
25+ use smtp_proto::{Request, Response as SmtpResponse};
26+ use transport::Response;
27+
28 /// Low Level SMTP protocol is exported for convenience
29 pub use smtp_proto;
30
31 pub use error::Error;
32 pub use server::Server;
33
34+ /// Generate an SMTP response
35+ #[macro_export(local_inner_macros)]
36+ macro_rules! smtp_response {
37+ ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
38+ Response::General(SmtpResponse::new($code, $e1, $e2, $e3, $name.to_string()))
39+ };
40+ }
41+
42+ /// Generate a successful SMTP response
43+ #[macro_export(local_inner_macros)]
44+ macro_rules! smtp_ok {
45+ ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
46+ Ok::<Response<String>, Response<String>>(Response::General(SmtpResponse::new(
47+ $code,
48+ $e1,
49+ $e2,
50+ $e3,
51+ $name.to_string(),
52+ )))
53+ };
54+ }
55+
56+ /// Generate an SMTP response error
57+ #[macro_export(local_inner_macros)]
58+ macro_rules! smtp_err {
59+ ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
60+ Err::<Response<String>, Response<String>>(Response::General(SmtpResponse::new(
61+ $code,
62+ $e1,
63+ $e2,
64+ $e3,
65+ $name.to_string(),
66+ )))
67+ };
68+ }
69 diff --git a/maitred/src/pipeline.rs b/maitred/src/pipeline.rs
70new file mode 100644
71index 0000000..b791b4c
72--- /dev/null
73+++ b/maitred/src/pipeline.rs
74 @@ -0,0 +1,209 @@
75+ use std::result::Result as StdResult;
76+
77+ use crate::session::{Result as SessionResult, Session};
78+ use crate::{smtp_err, smtp_ok, Request, SmtpResponse};
79+
80+ pub type Result = StdResult<Option<Vec<SessionResult>>, Vec<SessionResult>>;
81+ pub type Transaction = (Request<String>, SessionResult);
82+
83+ /// Pipeline chunks session request/responses into logical groups returning
84+ /// a response only once a session is considered "completed".
85+ #[derive(Default)]
86+ pub struct Pipeline {
87+ history: Vec<Transaction>,
88+ disable: bool,
89+ }
90+
91+ impl Pipeline {
92+ /// disable pipelining and return each each transaction transparently
93+ pub fn disable(mut self) -> Self {
94+ self.disable = true;
95+ self
96+ }
97+
98+ /// Checks if the pipeline is within a data transaction (if the previous
99+ /// command was DATA/BDAT).
100+ fn within_tx(&self) -> bool {
101+ self.history
102+ .last()
103+ .map(|tx| {
104+ matches!(tx.0, Request::Data)
105+ || matches!(
106+ tx.0,
107+ Request::Bdat {
108+ chunk_size: _,
109+ is_last: _
110+ }
111+ )
112+ })
113+ .is_some_and(|is_tx| is_tx)
114+ }
115+
116+ fn chunk(&mut self) -> Result {
117+ let mail_from_ok = self.history.iter().fold(false, |ok, tx| {
118+ if matches!(&tx.0, Request::Mail { from: _ }) {
119+ return tx.1.is_ok();
120+ }
121+ ok
122+ });
123+ let rcpt_to_ok_count = self.history.iter().fold(0, |count, tx| {
124+ if matches!(&tx.0, Request::Rcpt { to: _ }) && tx.1.is_ok() {
125+ return count + 1;
126+ }
127+ count
128+ });
129+ let last_command = self
130+ .history
131+ .last()
132+ .expect("to results called without history");
133+ if last_command.1.is_ok() && mail_from_ok && rcpt_to_ok_count > 0 {
134+ Ok(Some(self.history.iter().map(|tx| tx.1.clone()).collect()))
135+ } else if !mail_from_ok {
136+ self.history.pop();
137+ Err(self.history.iter().map(|tx| tx.1.clone()).collect())
138+ } else if !rcpt_to_ok_count <= 0 {
139+ self.history.pop();
140+ Err(self.history.iter().map(|tx| tx.1.clone()).collect())
141+ } else {
142+ Err(self.history.iter().map(|tx| tx.1.clone()).collect())
143+ }
144+ }
145+
146+ /// Per RFC 2920:
147+ /// Once the client SMTP has confirmed that support exists for the
148+ /// pipelining extension, the client SMTP may then elect to transmit
149+ /// groups of SMTP commands in batches without waiting for a response to
150+ /// each individual command. In particular, the commands RSET, MAIL FROM,
151+ /// SEND FROM, SOML FROM, SAML FROM, and RCPT TO can all appear anywhere
152+ /// in a pipelined command group. The EHLO, DATA, VRFY, EXPN, TURN,
153+ /// QUIT, and NOOP commands can only appear as the last command in a
154+ /// group since their success or failure produces a change of state which
155+ /// the client SMTP must accommodate. (NOOP is included in this group so
156+ /// it can be used as a synchronization point.)
157+ ///
158+ /// Process the next session result in the pipeline and return a vector of
159+ /// results if the command requires a server response. Returns true if
160+ /// the process was successful or false if it contained fatal errors. All
161+ /// results must be passed to the client.
162+ pub fn process(&mut self, req: &Request<String>, res: &SessionResult) -> Result {
163+ let is_data_tx = self.within_tx();
164+ if is_data_tx {
165+ // ignore the first data request
166+ self.history.pop();
167+ self.history.push((req.clone(), res.clone()));
168+ } else {
169+ self.history.push((req.clone(), res.clone()));
170+ }
171+ match req {
172+ Request::Ehlo { host: _ } => {
173+ self.history.clear();
174+ Ok(Some(vec![res.clone()]))
175+ }
176+ Request::Lhlo { host: _ } => {
177+ self.history.clear();
178+ Ok(Some(vec![res.clone()]))
179+ }
180+ Request::Helo { host: _ } => {
181+ self.history.clear();
182+ Ok(Some(vec![res.clone()]))
183+ }
184+ Request::Mail { from: _ } => Ok(None),
185+ Request::Rcpt { to: _ } => Ok(None),
186+ Request::Bdat {
187+ chunk_size: _,
188+ is_last: _,
189+ } => {
190+ if is_data_tx {
191+ let chunk = self.chunk();
192+ self.history.clear();
193+ chunk
194+ } else {
195+ Ok(None)
196+ }
197+ }
198+ Request::Auth {
199+ mechanism: _,
200+ initial_response: _,
201+ } => todo!(),
202+ Request::Noop { value: _ } => Ok(Some(vec![res.clone()])),
203+ Request::Vrfy { value: _ } => todo!(),
204+ Request::Expn { value: _ } => todo!(),
205+ Request::Help { value: _ } => Ok(Some(vec![res.clone()])),
206+ Request::Etrn { name: _ } => todo!(),
207+ Request::Atrn { domains: _ } => todo!(),
208+ Request::Burl { uri: _, is_last: _ } => todo!(),
209+ Request::StartTls => todo!(),
210+ Request::Data => {
211+ if is_data_tx {
212+ let chunk = self.chunk();
213+ self.history.clear();
214+ chunk
215+ } else {
216+ Ok(None)
217+ }
218+ }
219+ Request::Rset => {
220+ self.history.clear();
221+ Ok(Some(vec![res.clone()]))
222+ }
223+ Request::Quit => Ok(Some(vec![res.clone()])),
224+ }
225+ }
226+ }
227+
228+ #[cfg(test)]
229+ mod test {
230+
231+ use super::*;
232+ use crate::{smtp_err, smtp_ok, Request, Response, SmtpResponse};
233+
234+ #[test]
235+ pub fn test_pipeline_basic() {
236+ let mut pipeline = Pipeline::default();
237+ assert!(pipeline
238+ .process(
239+ &Request::Helo {
240+ host: "example.org".to_string(),
241+ },
242+ &smtp_ok!(200, 0, 0, 0, "OK")
243+ )
244+ .is_ok_and(|responses| responses.is_some_and(|responses| responses.len() == 1)));
245+ // batchable commands out of order
246+ assert!(pipeline
247+ .process(
248+ &Request::Rcpt {
249+ to: smtp_proto::RcptTo {
250+ address: "baz@qux.com".to_string(),
251+ ..Default::default()
252+ },
253+ },
254+ &smtp_ok!(200, 0, 0, 0, "OK: baz@qux.com")
255+ )
256+ .is_ok_and(|responses| responses.is_none()));
257+ assert!(pipeline
258+ .process(
259+ &Request::Mail {
260+ from: smtp_proto::MailFrom {
261+ address: "fuu@bar.com".to_string(),
262+ ..Default::default()
263+ }
264+ },
265+ &smtp_ok!(200, 0, 0, 0, "OK: fuu@bar.com")
266+ )
267+ .is_ok_and(|responses| responses.is_none()));
268+ // initialize a data request
269+ assert!(pipeline
270+ .process(&Request::Data {}, &smtp_ok!(200, 0, 0, 0, "OK"))
271+ .is_ok_and(|responses| responses.is_none()));
272+ // simulate the end of a request
273+ let result = pipeline.process(&Request::Data {}, &smtp_ok!(200, 0, 0, 0, "OK"));
274+ assert!(
275+ result.is_ok_and(|responses| responses.is_some_and(|responses| {
276+ responses.len() == 3
277+ && responses[0].is_ok()
278+ && responses[1].is_ok()
279+ && responses[2].is_ok()
280+ }))
281+ );
282+ }
283+ }
284 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
285index 84014ba..677581f 100644
286--- a/maitred/src/server.rs
287+++ b/maitred/src/server.rs
288 @@ -1,6 +1,7 @@
289 use std::time::Duration;
290
291 use futures::SinkExt;
292+ use melib::uuid::Bytes;
293 use smtp_proto::Request;
294 use tokio::{net::TcpListener, time::timeout};
295 use tokio_stream::StreamExt;
296 @@ -8,7 +9,7 @@ use tokio_util::codec::Framed;
297
298 use crate::error::Error;
299 use crate::session::{Options as SessionOptions, Session};
300- use crate::transport::Transport;
301+ use crate::transport::{Command, Transport};
302
303 const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:2525";
304 const DEFAULT_GREETING: &str = "Maitred ESMTP Server";
305 @@ -34,8 +35,8 @@ const DEFAULT_MAXIMUM_SIZE: u64 = 5_000_000;
306 // 250-SMTPUTF8
307 // 250 CHUNKING
308
309-
310- const DEFAULT_CAPABILITIES: u32 = smtp_proto::EXT_SIZE + smtp_proto::EXT_ENHANCED_STATUS_CODES;
311+ const DEFAULT_CAPABILITIES: u32 =
312+ smtp_proto::EXT_SIZE + smtp_proto::EXT_ENHANCED_STATUS_CODES + smtp_proto::EXT_PIPELINING;
313
314 #[derive(Clone)]
315 struct Configuration {
316 @@ -108,6 +109,8 @@ impl Server {
317 self
318 }
319
320+ /// Set the maximum size of a message which if exceeded will result in
321+ /// rejection.
322 pub fn with_maximum_size(mut self, size: u64) -> Self {
323 self.config.maximum_size = size;
324 self
325 @@ -129,8 +132,10 @@ impl Server {
326 &self.config.greeting,
327 ))
328 .await?;
329+
330 'outer: loop {
331- match timeout(self.config.global_timeout, framed.next()).await {
332+ let frame = timeout(self.config.global_timeout, framed.next()).await;
333+ match frame {
334 Ok(request) => {
335 if let Some(command) = request {
336 let mut finished = false;
337 diff --git a/maitred/src/session.rs b/maitred/src/session.rs
338index 31d05dd..65ded28 100644
339--- a/maitred/src/session.rs
340+++ b/maitred/src/session.rs
341 @@ -1,44 +1,13 @@
342 use std::result::Result as StdResult;
343
344- use crate::transport::Response;
345 use bytes::Bytes;
346 use mail_parser::MessageParser;
347 use melib::Address;
348 use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
349 use url::Host;
350
351- /// Generate an SMTP response
352- macro_rules! smtp_response {
353- ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
354- Response::General(SmtpResponse::new($code, $e1, $e2, $e3, $name.to_string()))
355- };
356- }
357-
358- /// Generate a successful SMTP response
359- macro_rules! smtp_ok {
360- ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
361- Ok::<Response<String>, Response<String>>(Response::General(SmtpResponse::new(
362- $code,
363- $e1,
364- $e2,
365- $e3,
366- $name.to_string(),
367- )))
368- };
369- }
370-
371- /// Generate an SMTP response error
372- macro_rules! smtp_err {
373- ($code:expr, $e1:expr, $e2:expr, $e3:expr, $name:expr) => {
374- Err::<Response<String>, Response<String>>(Response::General(SmtpResponse::new(
375- $code,
376- $e1,
377- $e2,
378- $e3,
379- $name.to_string(),
380- )))
381- };
382- }
383+ use crate::transport::Response;
384+ use crate::{smtp_err, smtp_ok, smtp_response};
385
386 /// Result generated as part of an SMTP session, an Err indicates a session
387 /// level error that will be returned to the client.
388 @@ -79,9 +48,9 @@ pub(crate) struct Session {
389 /// message body
390 pub body: Option<Vec<u8>>,
391 /// mailto address
392- pub mail_to: Option<Address>,
393+ pub mail_from: Option<Address>,
394 /// rcpt address
395- pub rcpt_to: Option<Address>,
396+ pub rcpt_to: Option<Vec<Address>>,
397 pub hostname: Option<Host>,
398 /// If an active data transfer is taking place
399 data_transfer: Option<DataTransfer>,
400 @@ -91,9 +60,10 @@ impl Session {
401 pub fn reset(&mut self) {
402 self.history.clear();
403 self.body = None;
404- self.mail_to = None;
405+ self.mail_from = None;
406 self.rcpt_to = None;
407- self.hostname = None;
408+ // FIXME: is the hostname reset?
409+ // self.hostname = None;
410 self.data_transfer = None;
411 }
412
413 @@ -134,7 +104,7 @@ impl Session {
414 smtp_ok!(250, 0, 0, 0, format!("Hello {}", host))
415 }
416 Request::Mail { from } => {
417- let mail_to = Address::try_from(from.address.as_str()).map_err(|e| {
418+ let mail_from = Address::try_from(from.address.as_str()).map_err(|e| {
419 smtp_response!(
420 500,
421 0,
422 @@ -143,14 +113,18 @@ impl Session {
423 format!("cannot parse: {} {}", from.address, e)
424 )
425 })?;
426- self.mail_to = Some(mail_to.clone());
427+ self.mail_from = Some(mail_from.clone());
428 smtp_ok!(250, 0, 0, 0, "OK")
429 }
430 Request::Rcpt { to } => {
431- let mail_to = Address::try_from(to.address.as_str()).map_err(|e| {
432+ let rcpt_to = Address::try_from(to.address.as_str()).map_err(|e| {
433 smtp_response!(500, 0, 0, 0, format!("Cannot parse: {} {}", to.address, e))
434 })?;
435- self.mail_to = Some(mail_to.clone());
436+ if let Some(ref mut rcpts) = self.rcpt_to {
437+ rcpts.push(rcpt_to.clone());
438+ } else {
439+ self.rcpt_to = Some(vec![rcpt_to.clone()]);
440+ }
441 smtp_ok!(250, 0, 0, 0, "OK")
442 }
443 Request::Bdat {
444 @@ -191,7 +165,7 @@ impl Session {
445 mechanism,
446 initial_response,
447 } => todo!(),
448- Request::Noop { value: _ } => smtp_ok!(250, 0, 0, 0, "OK".to_string(),),
449+ Request::Noop { value: _ } => smtp_ok!(250, 0, 0, 0, "OK".to_string()),
450 Request::Vrfy { value } => todo!(),
451 Request::Expn { value } => todo!(),
452 Request::Help { value } => {
453 diff --git a/maitred/src/transport.rs b/maitred/src/transport.rs
454index b69ac39..a624a84 100644
455--- a/maitred/src/transport.rs
456+++ b/maitred/src/transport.rs
457 @@ -5,7 +5,7 @@ use smtp_proto::request::receiver::{BdatReceiver, DataReceiver, RequestReceiver}
458 pub use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
459 use tokio_util::codec::{Decoder, Encoder};
460
461- #[derive(Debug)]
462+ #[derive(Debug, Clone)]
463 pub enum Response<T>
464 where
465 T: Display,
466 @@ -33,8 +33,8 @@ where
467 && req.deliver_by == other.deliver_by
468 && req.size == other.size
469 && req.auth_mechanisms == other.auth_mechanisms
470- && req.future_release_datetime == req.future_release_datetime
471- && req.future_release_interval == req.future_release_interval
472+ && req.future_release_datetime.eq(&req.future_release_datetime)
473+ && req.future_release_interval.eq(&req.future_release_interval)
474 }
475 },
476 }