Commit
Author: Kevin Schoon [me@kevinschoon.com]
Hash: 8a639449f7b82930bc2a8bc74b838ac0b85b5ee5
Timestamp: Sat, 17 Aug 2024 22:43:37 +0000 (4 months ago)

+22 -17 +/-2 browse
wire up delivery
1diff --git a/maitred/src/server.rs b/maitred/src/server.rs
2index 0b371b7..d6cff2b 100644
3--- a/maitred/src/server.rs
4+++ b/maitred/src/server.rs
5 @@ -202,13 +202,12 @@ where
6 self.shutdown_handles.push(tx);
7 let global_queue = global_queue.clone();
8 let stealers = stealers.clone();
9- let milter = self
10- .milter
11- .as_ref()
12- .map_or(None, |milter| Some(milter.clone()));
13+ let milter = self.milter.clone().expect("Milter not configured");
14+ let delivery = self.delivery.clone().expect("Delivery not configured");
15 tokio::task::spawn(async move {
16 let mut worker = Worker {
17 milter,
18+ delivery,
19 global_queue,
20 stealers,
21 local_queue: Arc::new(Mutex::new(local_queue)),
22 diff --git a/maitred/src/worker.rs b/maitred/src/worker.rs
23index 86e5878..d5160fd 100644
24--- a/maitred/src/worker.rs
25+++ b/maitred/src/worker.rs
26 @@ -9,7 +9,7 @@ use tokio::sync::{mpsc::Receiver, Mutex};
27 use tokio_stream::{self as stream};
28 use url::Host;
29
30- use crate::{Error, Milter, Session};
31+ use crate::{Delivery, Error, Milter, Session};
32
33 /// Session details to be passed internally for processing
34 #[derive(Clone, Debug)]
35 @@ -38,19 +38,22 @@ impl From<Session> for Packet {
36 /// Running DKIM verification
37 /// ARC Verficiation
38 /// SPF Verification
39- pub(crate) struct Worker<M>
40+ pub(crate) struct Worker<D, M>
41 where
42+ D: Delivery + Clone + Send + Sync,
43 M: Milter + Clone + Send + Sync,
44 {
45- pub milter: Option<Arc<M>>,
46+ pub milter: Arc<M>,
47+ pub delivery: Arc<D>,
48 pub global_queue: Arc<Injector<Packet>>,
49 pub stealers: Vec<Stealer<Packet>>,
50 pub local_queue: Arc<Mutex<WorkQueue<Packet>>>,
51 pub shutdown_rx: Receiver<bool>,
52 }
53
54- impl<M> Worker<M>
55+ impl<D, M> Worker<D, M>
56 where
57+ D: Delivery + Clone + Send + Sync,
58 M: Milter + Clone + Send + Sync + 'static,
59 {
60 async fn next_packet(&self) -> Option<Packet> {
61 @@ -79,16 +82,19 @@ where
62
63 if let Some(packet) = self.next_packet().await {
64 let mut message = packet.body.unwrap();
65- if let Some(milter) = self.milter.as_ref() {
66- match milter.apply(message).await {
67- Ok(modified) => {
68- tracing::info!("Milter finished successfully");
69- message = modified;
70- }
71- Err(err) => {
72- tracing::warn!("Milter failed to apply: {:?}", err);
73- }
74+ match self.milter.apply(message.clone()).await {
75+ Ok(modified) => {
76+ tracing::info!("Milter finished successfully");
77+ message = modified;
78 }
79+ Err(err) => {
80+ tracing::warn!("Milter failed to apply: {:?}", err);
81+ }
82+ };
83+ match self.delivery.deliver(message.clone()).await {
84+ Ok(_) => tracing::info!("Message successfully delivered"),
85+ // TODO: Implement retry here
86+ Err(err) => tracing::warn!("Message could not be delievered: {:?}", err),
87 }
88 } else {
89 ticker.tick().await;