Commit
Author: Kevin Schoon [me@kevinschoon.com]
Hash: 8bae5abc6d179cd6bb9e81918c68fe0ea2dc91f8
Timestamp: Sat, 31 Aug 2024 12:49:56 +0000 (3 months ago)

+25 -14 +/-3 browse
fixup transport error handling
1diff --git a/maitred/src/error.rs b/maitred/src/error.rs
2index 6fc07a3..b43c80f 100644
3--- a/maitred/src/error.rs
4+++ b/maitred/src/error.rs
5 @@ -5,7 +5,4 @@ pub enum Error {
6 /// An IO related error such as not being able to bind to a TCP socket
7 #[error("Io: {0}")]
8 Io(#[from] std::io::Error),
9- /// Session timeout
10- #[error("Client took too long to respond: {0}s")]
11- Timeout(u64),
12 }
13 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
14index 4a5517f..2906b8e 100644
15--- a/maitred/src/server.rs
16+++ b/maitred/src/server.rs
17 @@ -21,7 +21,7 @@ use crate::error::Error;
18 use crate::milter::Milter;
19 use crate::session::{Session, SessionOptions};
20 use crate::smtp_response;
21- use crate::transport::{Command, Transport};
22+ use crate::transport::{Command, Transport, TransportError};
23 use crate::worker::{Packet, Worker};
24 use crate::{Response, SmtpResponse};
25
26 @@ -38,6 +38,20 @@ fn is_quit(reqs: &[Request<String>]) -> bool {
27 reqs.last().is_some_and(|req| matches!(req, Request::Quit))
28 }
29
30+ /// Top level error encountered while processing a client connection, causes
31+ /// a warning to be logged but is not fatal.
32+ #[derive(Debug, thiserror::Error)]
33+ pub(crate) enum ClientError {
34+ /// An IO related error such as not being able to bind to a TCP socket
35+ #[error("Io: {0}")]
36+ Io(#[from] std::io::Error),
37+ #[error("Transport Error: {0}")]
38+ Transport(#[from] TransportError),
39+ /// Session timeout
40+ #[error("Client took too long to respond: {0}s")]
41+ Timeout(u64),
42+ }
43+
44 /// Server implements everything that is required to run an SMTP server by
45 /// binding to the configured address and processing individual TCP connections
46 /// as they are received.
47 @@ -110,7 +124,7 @@ impl Server {
48 &self,
49 mut framed: Framed<T, Transport>,
50 msg_queue: Arc<Injector<Packet>>,
51- ) -> Result<(), Error>
52+ ) -> Result<(), ClientError>
53 where
54 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
55 {
56 @@ -161,10 +175,10 @@ impl Server {
57 Ok(Some(Err(err))) => {
58 tracing::warn!("Client Error: {}", err);
59 let response = match err {
60- crate::transport::Error::PipelineNotEnabled => {
61+ crate::transport::TransportError::PipelineNotEnabled => {
62 crate::smtp_response!(500, 0, 0, 0, "Pipelining is not enabled")
63 }
64- crate::transport::Error::Smtp(e) => {
65+ crate::transport::TransportError::Smtp(e) => {
66 match e {
67 smtp_proto::Error::NeedsMoreData { bytes_left: _ } => {
68 // TODO
69 @@ -201,7 +215,7 @@ impl Server {
70 }
71 }
72 // IO Errors considered fatal for the entire session
73- crate::transport::Error::Io(e) => return Err(Error::Io(e)),
74+ crate::transport::TransportError::Io(e) => return Err(ClientError::Io(e)),
75 };
76 framed.send(response).await?;
77 }
78 @@ -214,7 +228,7 @@ impl Server {
79 framed
80 .send(crate::session::timeout(&timeout.to_string()))
81 .await?;
82- return Err(Error::Timeout(self.global_timeout.as_secs()));
83+ return Err(ClientError::Timeout(self.global_timeout.as_secs()));
84 }
85 }
86 }
87 diff --git a/maitred/src/transport.rs b/maitred/src/transport.rs
88index e35f8a8..74e3c4c 100644
89--- a/maitred/src/transport.rs
90+++ b/maitred/src/transport.rs
91 @@ -7,7 +7,7 @@ pub use smtp_proto::{EhloResponse, Request, Response as SmtpResponse};
92 use tokio_util::codec::{Decoder, Encoder};
93
94 #[derive(Debug, thiserror::Error)]
95- pub enum Error {
96+ pub(crate) enum TransportError {
97 /// Returned when a client attempts to send multiple commands sequentially
98 /// to the server without waiting for a response but piplining isn't
99 /// enabled.
100 @@ -109,7 +109,7 @@ impl Transport {
101 }
102
103 impl Encoder<Response<String>> for Transport {
104- type Error = crate::Error;
105+ type Error = TransportError;
106
107 fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> Result<(), Self::Error> {
108 match item {
109 @@ -126,7 +126,7 @@ impl Encoder<Response<String>> for Transport {
110
111 impl Decoder for Transport {
112 type Item = Command;
113- type Error = Error;
114+ type Error = TransportError;
115
116 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
117 if src.is_empty() {
118 @@ -172,7 +172,7 @@ impl Decoder for Transport {
119 match r.ingest(&mut iter, src) {
120 Ok(request) => {
121 if !requests.is_empty() && !self.pipelining {
122- return Err(Error::PipelineNotEnabled)
123+ return Err(TransportError::PipelineNotEnabled)
124 }
125 requests.push(request);
126 }
127 @@ -180,7 +180,7 @@ impl Decoder for Transport {
128 if matches!(err, smtp_proto::Error::NeedsMoreData { bytes_left: _ }) {
129 break 'outer;
130 } else {
131- return Err(Error::Smtp(err));
132+ return Err(TransportError::Smtp(err));
133 }
134 }
135 }