Commit
Author: Kevin Schoon [me@kevinschoon.com]
Hash: e3a3eb9fe9613c071a1daf72f66574442b0c30e2
Timestamp: Mon, 29 Jul 2024 22:47:20 +0000 (5 months ago)

+51 -32 +/-3 browse
add global timeout when waiting for client data
1diff --git a/maitred/src/error.rs b/maitred/src/error.rs
2index f6a2bd1..afee461 100644
3--- a/maitred/src/error.rs
4+++ b/maitred/src/error.rs
5 @@ -17,5 +17,7 @@ pub enum Error {
6 #[error("Failed to parse Url: {0}")]
7 UrlParsing(#[from] ParseError),
8 #[error("Failed to read UTF8: {0}")]
9- Utf8(#[from] FromUtf8Error)
10+ Utf8(#[from] FromUtf8Error),
11+ #[error("Client took too long to respond: {0}s")]
12+ Timeout(u64),
13 }
14 diff --git a/maitred/src/server.rs b/maitred/src/server.rs
15index 3b68bb3..72c47d0 100644
16--- a/maitred/src/server.rs
17+++ b/maitred/src/server.rs
18 @@ -2,7 +2,7 @@ use std::time::{Duration, Instant};
19
20 use futures::SinkExt;
21 use smtp_proto::Request;
22- use tokio::net::TcpListener;
23+ use tokio::{net::TcpListener, time::timeout};
24 use tokio_stream::StreamExt;
25 use tokio_util::codec::Framed;
26
27 @@ -79,8 +79,6 @@ impl Server {
28 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin,
29 {
30 let mut session = Session::default();
31- let start_time = Instant::now();
32- let mut n_commands = 0;
33 // send inital server greeting
34 framed
35 .send(crate::session::greeting(
36 @@ -88,32 +86,49 @@ impl Server {
37 &self.config.greeting,
38 ))
39 .await?;
40- let mut finished = false;
41- while let Some(request) = framed.next().await {
42- match request {
43- Ok(command) => {
44- if matches!(command.0, Request::Quit) {
45- finished = true;
46- }
47- match session.process(&opts, &command.0, command.1) {
48- Ok(resp) => {
49- tracing::debug!("Returning response: {:?}", resp);
50- framed.send(resp).await?;
51- }
52- Err(err) => {
53- tracing::warn!("Client error: {:?}", err);
54- framed.send(err).await?;
55+ 'outer: loop {
56+ match timeout(self.config.global_timeout, framed.next()).await {
57+ Ok(request) => {
58+ if let Some(command) = request {
59+ let mut finished = false;
60+ match command {
61+ Ok(command) => {
62+ if matches!(command.0, Request::Quit) {
63+ finished = true;
64+ }
65+ match session.process(&opts, &command.0, command.1) {
66+ Ok(resp) => {
67+ tracing::debug!("Returning response: {:?}", resp);
68+ framed.send(resp).await?;
69+ }
70+ Err(err) => {
71+ tracing::warn!("Client error: {:?}", err);
72+ framed.send(err).await?;
73+ }
74+ };
75+ if finished {
76+ break 'outer;
77+ }
78+ }
79+ Err(err) => {
80+ tracing::warn!("Socket closed with error: {:?}", err);
81+ return Err(err);
82+ }
83 }
84+ } else {
85+ break 'outer;
86 };
87 }
88- Err(err) => {
89- tracing::warn!("Socket closed with error: {:?}", err);
90- return Err(err);
91+ Err(timeout) => {
92+ tracing::warn!(
93+ "Client connection exceeded: {:?}",
94+ self.config.global_timeout
95+ );
96+ framed
97+ .send(crate::session::timeout(&timeout.to_string()))
98+ .await?;
99+ return Err(Error::Timeout(self.config.global_timeout.as_secs()));
100 }
101- };
102-
103- if finished {
104- break;
105 }
106 }
107 tracing::info!("Connection closed");
108 @@ -125,6 +140,8 @@ impl Server {
109 tracing::info!("Mail server listening @ {}", self.config.address);
110 loop {
111 let (socket, _) = listener.accept().await.unwrap();
112+ let addr = socket.local_addr()?;
113+ tracing::info!("Accepted connection on: {:?}", addr);
114 let framed = Framed::new(socket, Transport::default());
115 if let Err(err) = self
116 .process(
117 diff --git a/maitred/src/session.rs b/maitred/src/session.rs
118index 7d06d19..80d5578 100644
119--- a/maitred/src/session.rs
120+++ b/maitred/src/session.rs
121 @@ -1,15 +1,11 @@
122 use std::result::Result as StdResult;
123- use std::time::{Duration, Instant};
124
125 use bytes::Bytes;
126- use mail_parser::{Addr, Message, MessageParser};
127+ use mail_parser::MessageParser;
128 use melib::Address;
129 use smtp_proto::{Request, Response};
130 use url::Host;
131
132- use crate::error::Error;
133- use crate::transport::Command;
134-
135 pub type Result = StdResult<Response<String>, Response<String>>;
136
137 enum DataTransfer {
138 @@ -23,12 +19,16 @@ pub fn greeting(hostname: &str, greeting: &str) -> Response<String> {
139 Response::new(220, 2, 0, 0, format!("{} {}", hostname, greeting))
140 }
141
142+ /// Sent when the connection exceeds the maximum configured timeout
143+ pub fn timeout(message: &str) -> Response<String> {
144+ Response::new(421, 4, 4, 2, format!("Timeout exceeded: {}", message))
145+ }
146+
147 /// Runtime options that influence server behavior
148 pub(crate) struct Options {
149 pub hostname: String,
150 }
151
152-
153 /// Stateful connection that coresponds to a single SMTP session
154 #[derive(Default)]
155 pub(crate) struct Session {