Commit
Author: Kevin Schoon [me@kevinschoon.com]
Hash: 124e5aa1671dd07c58c9ae2dcf19146efb94a2d9
Timestamp: Tue, 07 Jan 2025 00:25:45 +0000 (2 weeks ago)

+362 -135 +/-6 browse
wrap up first relay implementation
1diff --git a/maitred/Cargo.toml b/maitred/Cargo.toml
2index a3426ff..74ba7e8 100644
3--- a/maitred/Cargo.toml
4+++ b/maitred/Cargo.toml
5 @@ -41,7 +41,7 @@ tracing-subscriber = "0.3.18"
6
7 [features]
8 default = []
9- full = ["auth", "client", "server"]
10+ full = ["auth", "relay", "server"]
11 auth = [
12 "base64",
13 "server"
14 @@ -55,7 +55,8 @@ server = [
15 "tokio-stream",
16 "tokio-util"
17 ]
18- client = [
19+ relay = [
20 "hickory-resolver",
21- "lettre"
22+ "lettre",
23+ "rustls"
24 ]
25 diff --git a/maitred/examples/relay.rs b/maitred/examples/relay.rs
26new file mode 100644
27index 0000000..e18b287
28--- /dev/null
29+++ b/maitred/examples/relay.rs
30 @@ -0,0 +1,55 @@
31+ use mail_parser::MessageParser;
32+ use maitred::relay::{Error, Relay, Sorted};
33+
34+ const DNS_RESOLUTION_ENABLED: bool = false;
35+
36+ const TEST_EMAIL: &str = r#"From: hello@ayllu-forge.org
37+ To: dev@localhost
38+ Cc: Fuu Bar <kevin@ayllu-forge.org>
39+ Subject: [PATCH] add delivery parameters for mail module in db crate
40+ Date: Mon, 23 Dec 2024 18:49:34 +0100
41+ Message-ID: <20241223174934.5903-1-hello@ayllu-forge.org>
42+ X-Mailer: git-send-email 2.47.1
43+ MIME-Version: 1.0
44+ Content-Transfer-Encoding: 8bit
45+
46+ From: Fuu Bar <me@example.org>
47+
48+ ---
49+ ayllu-mail/src/delivery.rs | 12 +++++-----
50+
51+ TRUNCATED
52+ "#;
53+
54+ #[tokio::main]
55+ async fn main() {
56+ tracing_subscriber::fmt()
57+ .compact()
58+ .with_line_number(true)
59+ .init();
60+ maitred::crypto::init();
61+ let parser = MessageParser::new();
62+ let message = parser.parse(TEST_EMAIL).unwrap();
63+ let sorted = Sorted::from_message(&message).unwrap();
64+ let relay = Relay::builder()
65+ .port(2525)
66+ .resolve_dns(DNS_RESOLUTION_ENABLED)
67+ .build();
68+ for (domain, envelope) in sorted.0.iter() {
69+ println!("Delivering message to domain: {}", domain);
70+ match relay.send(domain, envelope, message.raw_message()).await {
71+ Ok(_) => {
72+ println!("Message delivered successfully");
73+ }
74+ Err(Error::LettreTransport(errors)) => {
75+ eprintln!("All delivery attempts failed:");
76+ for (i, attempt) in errors.iter().enumerate() {
77+ eprintln!("\tAttempt {}: {}", i, attempt);
78+ }
79+ }
80+ Err(e) => {
81+ eprintln!("Failed to send message: {}", e);
82+ }
83+ }
84+ }
85+ }
86 diff --git a/maitred/src/client.rs b/maitred/src/client.rs
87deleted file mode 100644
88index a1956ab..0000000
89--- a/maitred/src/client.rs
90+++ /dev/null
91 @@ -1,130 +0,0 @@
92- use hickory_resolver::error::ResolveError;
93- use hickory_resolver::proto::rr::rdata::MX;
94- use hickory_resolver::system_conf::read_system_conf;
95- use hickory_resolver::TokioAsyncResolver;
96- use lettre::error::Error as LettreError;
97- use lettre::{
98- address::AddressError, message::Mailbox, Address as LettreAddress, AsyncSmtpTransport,
99- Message as LettreMessage,
100- };
101- use mail_parser::{Addr, Message};
102-
103- #[derive(Debug, thiserror::Error)]
104- pub enum Error {
105- #[error("Message does not contain a TO field")]
106- NoToAddress,
107- #[error("Message does not contain a FROM field")]
108- NoFromAddress,
109- #[error("Cannot parse email address: {0}")]
110- Address(#[from] AddressError),
111- #[error("Client error: {0}")]
112- Lettre(#[from] LettreError),
113- #[error("DNS Resolution: {0}")]
114- Resolution(#[from] ResolveError),
115- }
116-
117- pub struct Client {}
118-
119- async fn resolve_mx_record(domain: &str) -> Result<Vec<MX>, ResolveError> {
120- let (cfg, opts) = read_system_conf()?;
121- let resolver = TokioAsyncResolver::tokio(cfg, opts);
122- let response = resolver.mx_lookup(domain).await?;
123- let mut records: Vec<MX> = response.iter().cloned().collect();
124- records.sort_by_key(|record| record.preference());
125- Ok(records)
126- }
127-
128- fn read_mailbox(input: &Addr<'_>) -> Result<Mailbox, Error> {
129- let addr_str = input
130- .address()
131- .map_or(Err(Error::NoFromAddress), |x| Ok(x.to_string()))?;
132- Ok(Mailbox::new(
133- input.name().map(|name| name.to_string()),
134- addr_str.parse::<LettreAddress>()?,
135- ))
136- }
137-
138- pub async fn send(recipient: &Mailbox, message: Message<'_>) -> Result<(), Error> {
139- let from = message.from().map_or(Err(Error::NoFromAddress), |from| {
140- from.first()
141- .map_or(Err(Error::NoFromAddress), |from| read_mailbox(from))
142- })?;
143- let mx_records = resolve_mx_record(from.email.domain()).await?;
144- for record in mx_records {
145- tracing::info!("Attempting to deliver message to domain: {}", record);
146- // let transport = AsyncSmtpTransport::relay(&record.exchange().to_string())
147- // .unwrap()
148- // .build();
149- // // if !transport.test_connection().await? {
150- // tracing::info!("Cannot establish connection to client");
151- // continue;
152- // }
153- }
154- todo!()
155- }
156-
157- pub fn recipients(message: Message<'_>) -> Result<Vec<Mailbox>, Error> {
158- let to = message.to().map_or(Err(Error::NoToAddress), |to| {
159- to.first()
160- .map_or(Err(Error::NoToAddress), |to| read_mailbox(to))
161- })?;
162- let cc = message
163- .cc()
164- .map(|cc| {
165- let values: Result<Vec<Mailbox>, Error> =
166- cc.iter().try_fold(Vec::new(), |mut accm, x| {
167- let mailbox = read_mailbox(x)?;
168- accm.push(mailbox);
169- Ok(accm)
170- });
171- values
172- })
173- .transpose()?;
174- Ok([Vec::from_iter([to]), cc.unwrap_or_default()].concat())
175- }
176-
177- #[cfg(test)]
178- mod test {
179- use std::time::Duration;
180-
181- use super::*;
182- use mail_parser::MessageParser;
183- use port_check::free_local_ipv4_port;
184-
185- const TEST_EMAIL: &str = r#"From: hello@ayllu-forge.org
186- To: dev@ayllu-dev.local
187- Cc: Fuu Bar <me@example.org>
188- Subject: [PATCH] add delivery parameters for mail module in db crate
189- Date: Mon, 23 Dec 2024 18:49:34 +0100
190- Message-ID: <20241223174934.5903-1-hello@ayllu-forge.org>
191- X-Mailer: git-send-email 2.47.1
192- MIME-Version: 1.0
193- Content-Transfer-Encoding: 8bit
194-
195- From: Fuu Bar <me@example.org>
196-
197- ---
198- ayllu-mail/src/delivery.rs | 12 +++++-----
199-
200- TRUNCATED
201- "#;
202-
203- #[test]
204- fn recipient_parsing() {
205- let parser = MessageParser::new();
206- let message = parser.parse(TEST_EMAIL).unwrap();
207- let mailboxes = recipients(message).unwrap();
208- assert!(mailboxes.len() == 2);
209- assert!(mailboxes.first().unwrap().email.domain() == "ayllu-dev.local");
210- assert!(mailboxes.get(1).unwrap().email.domain() == "example.org");
211- }
212-
213- #[tokio::test]
214- async fn client_server() {
215- let test_addr = format!("127.0.0.1:{}", free_local_ipv4_port().unwrap());
216- tokio::task::spawn(async move {
217- let mut test_server = crate::Server::default().address(&test_addr);
218- test_server.listen().await.unwrap();
219- });
220- }
221- }
222 diff --git a/maitred/src/crypto.rs b/maitred/src/crypto.rs
223new file mode 100644
224index 0000000..0b8cdda
225--- /dev/null
226+++ b/maitred/src/crypto.rs
227 @@ -0,0 +1,8 @@
228+ use rustls::crypto::{aws_lc_rs, CryptoProvider};
229+
230+ /// Initialize the default crypto provider, required if using opportunistic TLS.
231+ pub fn init() {
232+ if CryptoProvider::get_default().is_none() {
233+ CryptoProvider::install_default(aws_lc_rs::default_provider()).unwrap()
234+ }
235+ }
236 diff --git a/maitred/src/lib.rs b/maitred/src/lib.rs
237index 7b045b9..c842e39 100644
238--- a/maitred/src/lib.rs
239+++ b/maitred/src/lib.rs
240 @@ -103,8 +103,11 @@ pub mod verify;
241 #[doc(inline)]
242 pub use verify::{Verify, VerifyError, VerifyFunc};
243 /// DNS Resolution Helper
244- #[cfg(feature = "client")]
245- pub mod client;
246+ #[cfg(feature = "relay")]
247+ pub mod relay;
248+ /// Crypto helpers
249+ #[cfg(any(feature = "server", feature="relay"))]
250+ pub mod crypto;
251
252 /// Generate a single smtp_response
253 macro_rules! smtp_response {
254 diff --git a/maitred/src/relay.rs b/maitred/src/relay.rs
255new file mode 100644
256index 0000000..d52c4fa
257--- /dev/null
258+++ b/maitred/src/relay.rs
259 @@ -0,0 +1,290 @@
260+ use std::collections::HashMap;
261+ use std::fmt::Display;
262+ use std::time::Duration;
263+
264+ use hickory_resolver::error::ResolveError;
265+ use hickory_resolver::proto::rr::rdata::MX;
266+ use hickory_resolver::system_conf::read_system_conf;
267+ use hickory_resolver::TokioAsyncResolver;
268+ use lettre::address::Envelope as LettreEnvelope;
269+ use lettre::error::Error as LettreError;
270+ use lettre::transport::smtp::client::{Tls, TlsParameters};
271+ use lettre::transport::smtp::extension::ClientId;
272+ use lettre::transport::smtp::Error as LettreTransportError;
273+ use lettre::{address::AddressError, Address as LettreAddress, AsyncTransport};
274+ use lettre::{AsyncSmtpTransport, Tokio1Executor};
275+ use mail_parser::{Address, Message};
276+
277+ const DEFAULT_SUBMISSION_PORT: u16 = 25;
278+ const DEFAULT_TIMEOUT_MS: u64 = 5000;
279+
280+ /// Relay level errors
281+ #[derive(Debug, thiserror::Error)]
282+ pub enum Error {
283+ #[error("Message does not contain a TO field")]
284+ NoToAddress,
285+ #[error("Message does not contain a FROM field")]
286+ NoFromAddress,
287+ #[error("Cannot parse email address: {0}")]
288+ Address(#[from] AddressError),
289+ #[error("Client error: {0}")]
290+ Lettre(#[from] LettreError),
291+ #[error("Lettre transport failures")]
292+ LettreTransport(Vec<LettreTransportError>),
293+ #[error("DNS Resolution: {0}")]
294+ Resolution(#[from] ResolveError),
295+ }
296+
297+ fn addresses(addr: Option<&Address<'_>>) -> Result<Vec<LettreAddress>, Error> {
298+ Ok(addr
299+ .map(|addr| {
300+ addr.iter().try_fold(Vec::new(), |mut accm, addr| {
301+ let address: LettreAddress = addr.address().unwrap().parse()?;
302+ accm.push(address);
303+ Ok::<Vec<LettreAddress>, Error>(accm)
304+ })
305+ })
306+ .transpose()?
307+ .unwrap_or(Vec::new()))
308+ }
309+
310+ /// Sorted pairs of envelopes organized by domain
311+ #[derive(Clone, Default)]
312+ pub struct Sorted(pub HashMap<Hostname, LettreEnvelope>);
313+
314+ impl Sorted {
315+ /// Return a sorted list of domains specified in the message with their
316+ /// envelope for use in the Lettre transport.
317+ pub fn from_message(message: &Message<'_>) -> Result<Self, Error> {
318+ let from = message.from().map_or(Err(Error::NoFromAddress), |from| {
319+ from.first().map_or(Err(Error::NoFromAddress), |from| {
320+ let address: LettreAddress = from.address().unwrap().parse()?;
321+ Ok(address)
322+ })
323+ })?;
324+ let unsorted = [
325+ addresses(message.to())?,
326+ addresses(message.cc())?,
327+ addresses(message.bcc())?, // TODO: Should BCC work the same?
328+ ]
329+ .concat();
330+ let sorted: HashMap<String, Vec<LettreAddress>> =
331+ unsorted.iter().fold(HashMap::new(), |mut accm, address| {
332+ if let Some(addresses) = accm.get_mut(address.domain()) {
333+ addresses.push(address.clone());
334+ } else {
335+ accm.insert(address.domain().to_string(), vec![address.clone()]);
336+ }
337+ accm
338+ });
339+ Ok(Sorted(sorted.iter().fold(
340+ HashMap::new(),
341+ |mut accm, (domain, addresses)| {
342+ accm.insert(
343+ Hostname::new(domain.as_ref()),
344+ LettreEnvelope::new(Some(from.clone()), addresses.clone()).unwrap(),
345+ );
346+ accm
347+ },
348+ )))
349+ }
350+ }
351+
352+ /// RFC1123 Hostname
353+ /// TODO: Actually implement RFC1123
354+ #[derive(Clone, Debug, Eq, Hash, PartialEq)]
355+ pub struct Hostname(String);
356+
357+ impl Hostname {
358+ pub fn new(hostname: &str) -> Self {
359+ Hostname(hostname.to_string())
360+ }
361+ }
362+
363+ impl From<Hostname> for String {
364+ fn from(val: Hostname) -> Self {
365+ val.0.clone()
366+ }
367+ }
368+
369+ impl From<&str> for Hostname {
370+ fn from(value: &str) -> Self {
371+ Hostname(value.to_string())
372+ }
373+ }
374+
375+ impl Display for Hostname {
376+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377+ f.write_str(self.0.as_ref())
378+ }
379+ }
380+
381+ #[derive(Clone)]
382+ pub enum TlsConfiguration {
383+ Insecure,
384+ Opportunistic,
385+ }
386+
387+ #[derive(Default)]
388+ pub struct RelayBuilder {
389+ hostname: Option<ClientId>,
390+ port: Option<u16>,
391+ tls: Option<TlsConfiguration>,
392+ resolve_dns: Option<bool>,
393+ }
394+
395+ impl RelayBuilder {
396+ pub fn build(&self) -> Relay {
397+ Relay {
398+ hostname: self
399+ .hostname
400+ .as_ref()
401+ .map_or(ClientId::default(), |hostname| hostname.clone()),
402+ port: self.port.unwrap_or(DEFAULT_SUBMISSION_PORT),
403+ tls: match self.tls {
404+ Some(TlsConfiguration::Insecure) => TlsConfiguration::Insecure,
405+ Some(TlsConfiguration::Opportunistic) | None => TlsConfiguration::Opportunistic,
406+ },
407+ resolve_dns: self.resolve_dns.is_some_and(|resolve| resolve),
408+ }
409+ }
410+
411+ pub fn insecure(mut self) -> Self {
412+ self.tls = Some(TlsConfiguration::Insecure);
413+ self
414+ }
415+
416+ pub fn port(mut self, port: u16) -> Self {
417+ self.port = Some(port);
418+ self
419+ }
420+
421+ pub fn resolve_dns(mut self, enabled: bool) -> Self {
422+ self.resolve_dns = Some(enabled);
423+ self
424+ }
425+ }
426+
427+ /// Implements a message relay
428+ pub struct Relay {
429+ /// Hostname advertised in during SMTP HELO
430+ hostname: ClientId,
431+ port: u16,
432+ tls: TlsConfiguration,
433+ /// If enabled DNS resolution will be performed to lookup MX records for
434+ /// the host. If disabled then the hostname will is assumed to be literal
435+ /// and an SMTP connection is established.
436+ resolve_dns: bool,
437+ }
438+
439+ impl Relay {
440+ pub fn builder() -> RelayBuilder {
441+ RelayBuilder::default()
442+ }
443+
444+ fn transport(&self, hostname: Hostname) -> Result<AsyncSmtpTransport<Tokio1Executor>, Error> {
445+ let builder = AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(hostname.clone())
446+ .hello_name(ClientId::Domain(self.hostname.to_string()))
447+ .timeout(Some(Duration::from_secs(DEFAULT_TIMEOUT_MS)))
448+ .port(self.port)
449+ .tls(match self.tls {
450+ TlsConfiguration::Insecure => Tls::None,
451+ TlsConfiguration::Opportunistic => {
452+ Tls::Opportunistic(TlsParameters::builder(hostname.0).build_rustls().unwrap())
453+ }
454+ });
455+ let transport = builder.build();
456+ Ok(transport)
457+ }
458+
459+ async fn resolve_mx_record(&self, domain: &str) -> Result<Vec<Hostname>, ResolveError> {
460+ tracing::info!("Looking up MX records for domain: {}", domain);
461+ let (cfg, opts) = read_system_conf()?;
462+ let resolver = TokioAsyncResolver::tokio(cfg, opts);
463+ let response = resolver.mx_lookup(domain).await?;
464+ let mut records: Vec<MX> = response.iter().cloned().collect();
465+ records.sort_by_key(|record| record.preference());
466+ records.iter().for_each(|record| {
467+ tracing::info!("Resolved record: {}", record.to_string());
468+ });
469+ Ok(records
470+ .iter()
471+ .map(|record| Hostname::new(&record.exchange().to_utf8())) // FIXME ?
472+ .collect())
473+ }
474+
475+ /// Send a raw message
476+ pub async fn send(
477+ &self,
478+ hostname: &Hostname,
479+ envelope: &LettreEnvelope,
480+ message: &[u8],
481+ ) -> Result<(), Error> {
482+ let hostnames = if self.resolve_dns {
483+ self.resolve_mx_record(&hostname.0).await?
484+ } else {
485+ vec![hostname.clone()]
486+ };
487+ let mut failures: Vec<LettreTransportError> = Vec::new();
488+ for hostname in hostnames {
489+ tracing::info!("Attempting to deliver message to mail server: {}", hostname);
490+ let transport = self.transport(hostname.clone())?;
491+ match transport.send_raw(envelope, message).await {
492+ Ok(_) => return Ok(()),
493+ Err(e) => {
494+ tracing::warn!("Failed to relay message to mail server: {}", e);
495+ failures.push(e);
496+ continue;
497+ }
498+ }
499+ }
500+ Err(Error::LettreTransport(failures))
501+ }
502+ }
503+
504+ #[cfg(test)]
505+ mod test {
506+
507+ use super::*;
508+ use mail_parser::MessageParser;
509+
510+ const TEST_EMAIL: &str = r#"From: hello@ayllu-forge.org
511+ To: dev@ayllu-dev.local
512+ Cc: Fuu Bar <me@example.org>
513+ Subject: [PATCH] add delivery parameters for mail module in db crate
514+ Date: Mon, 23 Dec 2024 18:49:34 +0100
515+ Message-ID: <20241223174934.5903-1-hello@ayllu-forge.org>
516+ X-Mailer: git-send-email 2.47.1
517+ MIME-Version: 1.0
518+ Content-Transfer-Encoding: 8bit
519+
520+ From: Fuu Bar <me@example.org>
521+
522+ ---
523+ ayllu-mail/src/delivery.rs | 12 +++++-----
524+
525+ TRUNCATED
526+ "#;
527+
528+ #[test]
529+ fn sorted_parsing() {
530+ let parser = MessageParser::new();
531+ let message = parser.parse(TEST_EMAIL).unwrap();
532+ let sorted = Sorted::from_message(&message).unwrap();
533+ assert!(sorted.0.len() == 2);
534+ let d1 = sorted.0.get(&"ayllu-dev.local".into()).unwrap();
535+ let d1_from = d1.from().unwrap();
536+ assert!(d1_from.user() == "hello");
537+ assert!(d1_from.domain() == "ayllu-forge.org");
538+ assert!(d1.to().len() == 1);
539+ assert!(d1.to()[0].user() == "dev");
540+ assert!(d1.to()[0].domain() == "ayllu-dev.local");
541+ let d2 = sorted.0.get(&"example.org".into()).unwrap();
542+ let d2_from = d2.from().unwrap();
543+ assert!(d2_from.user() == "hello");
544+ assert!(d2_from.domain() == "ayllu-forge.org");
545+ assert!(d2.to().len() == 1);
546+ assert!(d2.to()[0].user() == "me");
547+ assert!(d2.to()[0].domain() == "example.org");
548+ }
549+ }