+325 -54 +/-15 browse
1 | diff --git a/cli/src/main.rs b/cli/src/main.rs |
2 | index dc9d80b..26d0c3a 100644 |
3 | --- a/cli/src/main.rs |
4 | +++ b/cli/src/main.rs |
5 | @@ -30,6 +30,7 @@ use mailpot::{ |
6 | melib::{backends::maildir::MaildirPathTrait, smol, smtp::*, Envelope, EnvelopeHash}, |
7 | models::{changesets::*, *}, |
8 | queue::{Queue, QueueEntry}, |
9 | + transaction::TransactionBehavior, |
10 | Configuration, Connection, Error, ErrorKind, Result, *, |
11 | }; |
12 | use mailpot_cli::*; |
13 | @@ -507,6 +508,7 @@ fn run_app(opt: Opt) -> Result<()> { |
14 | println!("post dry_run{:?}", dry_run); |
15 | } |
16 | |
17 | + let tx = db.transaction(TransactionBehavior::Exclusive).unwrap(); |
18 | let mut input = String::new(); |
19 | std::io::stdin().read_to_string(&mut input)?; |
20 | match Envelope::from_bytes(input.as_bytes(), None) { |
21 | @@ -514,7 +516,7 @@ fn run_app(opt: Opt) -> Result<()> { |
22 | if opt.debug { |
23 | eprintln!("{:?}", &env); |
24 | } |
25 | - db.post(&env, input.as_bytes(), dry_run)?; |
26 | + tx.post(&env, input.as_bytes(), dry_run)?; |
27 | } |
28 | Err(err) if input.trim().is_empty() => { |
29 | eprintln!("Empty input, abort."); |
30 | @@ -522,21 +524,28 @@ fn run_app(opt: Opt) -> Result<()> { |
31 | } |
32 | Err(err) => { |
33 | eprintln!("Could not parse message: {}", err); |
34 | - let p = db.conf().save_message(input)?; |
35 | + let p = tx.conf().save_message(input)?; |
36 | eprintln!("Message saved at {}", p.display()); |
37 | return Err(err.into()); |
38 | } |
39 | } |
40 | + tx.commit()?; |
41 | } |
42 | FlushQueue { dry_run } => { |
43 | + let tx = db.transaction(TransactionBehavior::Exclusive).unwrap(); |
44 | let messages = if opt.debug { |
45 | println!("flush-queue dry_run {:?}", dry_run); |
46 | - db.queue(Queue::Out)? |
47 | + tx.queue(Queue::Out)? |
48 | .into_iter() |
49 | .map(DbVal::into_inner) |
50 | + .chain( |
51 | + tx.queue(Queue::Deferred)? |
52 | + .into_iter() |
53 | + .map(DbVal::into_inner), |
54 | + ) |
55 | .collect() |
56 | } else { |
57 | - db.delete_from_queue(Queue::Out, vec![])? |
58 | + tx.delete_from_queue(Queue::Out, vec![])? |
59 | }; |
60 | if opt.verbose > 0 || opt.debug { |
61 | println!("Queue out has {} messages.", messages.len()); |
62 | @@ -544,7 +553,7 @@ fn run_app(opt: Opt) -> Result<()> { |
63 | |
64 | let mut failures = Vec::with_capacity(messages.len()); |
65 | |
66 | - let send_mail = db.conf().send_mail.clone(); |
67 | + let send_mail = tx.conf().send_mail.clone(); |
68 | match send_mail { |
69 | mailpot::SendMail::ShellCommand(cmd) => { |
70 | fn submit(cmd: &str, msg: &QueueEntry) -> Result<()> { |
71 | @@ -589,18 +598,27 @@ fn run_app(opt: Opt) -> Result<()> { |
72 | } |
73 | } |
74 | mailpot::SendMail::Smtp(_) => { |
75 | - let conn_future = db.new_smtp_connection()?; |
76 | - smol::future::block_on(smol::spawn(async move { |
77 | + let conn_future = tx.new_smtp_connection()?; |
78 | + failures = smol::future::block_on(smol::spawn(async move { |
79 | let mut conn = conn_future.await?; |
80 | for msg in messages { |
81 | if let Err(err) = Connection::submit(&mut conn, &msg, dry_run).await { |
82 | failures.push((err, msg)); |
83 | } |
84 | } |
85 | - Ok::<(), Error>(()) |
86 | + Ok::<_, Error>(failures) |
87 | }))?; |
88 | } |
89 | } |
90 | + |
91 | + for (err, mut msg) in failures { |
92 | + log::error!("Message {msg:?} failed with: {err}. Inserting to Deferred queue."); |
93 | + |
94 | + msg.queue = Queue::Deferred; |
95 | + tx.insert_to_queue(msg)?; |
96 | + } |
97 | + |
98 | + tx.commit()?; |
99 | } |
100 | ErrorQueue { cmd } => match cmd { |
101 | ErrorQueueCommand::List => { |
102 | diff --git a/cli/tests/out_queue_flush.rs b/cli/tests/out_queue_flush.rs |
103 | index fabc966..e962fd2 100644 |
104 | --- a/cli/tests/out_queue_flush.rs |
105 | +++ b/cli/tests/out_queue_flush.rs |
106 | @@ -136,7 +136,7 @@ fn test_out_queue_flush() { |
107 | log::info!("Subscribe two users, Αλίκη and Χαραλάμπης to foo-chat."); |
108 | |
109 | { |
110 | - let mut db = Connection::open_or_create_db(config.clone()) |
111 | + let db = Connection::open_or_create_db(config.clone()) |
112 | .unwrap() |
113 | .trusted(); |
114 | |
115 | @@ -204,7 +204,7 @@ fn test_out_queue_flush() { |
116 | ); |
117 | |
118 | { |
119 | - let mut db = Connection::open_or_create_db(config.clone()) |
120 | + let db = Connection::open_or_create_db(config.clone()) |
121 | .unwrap() |
122 | .trusted(); |
123 | let mail = generate_mail("Χαραλάμπης", "", "hello world", "Hello there.", &mut seq); |
124 | @@ -332,7 +332,7 @@ fn test_list_requests_submission() { |
125 | log::info!("User Αλίκη sends to foo-chat+request with subject 'help'."); |
126 | |
127 | { |
128 | - let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
129 | + let db = Connection::open_or_create_db(config).unwrap().trusted(); |
130 | |
131 | let mail = generate_mail("Αλίκη", "+request", "help", "", &mut seq); |
132 | let subenvelope = mailpot::melib::Envelope::from_bytes(mail.as_bytes(), None) |
133 | diff --git a/core/Cargo.toml b/core/Cargo.toml |
134 | index f411ab4..ba0c794 100644 |
135 | --- a/core/Cargo.toml |
136 | +++ b/core/Cargo.toml |
137 | @@ -17,7 +17,7 @@ error-chain = { version = "0.12.4", default-features = false } |
138 | log = "0.4" |
139 | melib = { version = "*", default-features = false, features = ["smtp", "unicode_algorithms", "maildir_backend"], git = "https://github.com/meli/meli", rev = "2447a2c" } |
140 | minijinja = { version = "0.31.0", features = ["source", ] } |
141 | - rusqlite = { version = "^0.28", features = ["bundled", "trace", "hooks", "serde_json", "array", "chrono"] } |
142 | + rusqlite = { version = "^0.28", features = ["bundled", "trace", "hooks", "serde_json", "array", "chrono", "unlock_notify"] } |
143 | serde = { version = "^1", features = ["derive", ] } |
144 | serde_json = "^1" |
145 | toml = "^0.5" |
146 | diff --git a/core/src/connection.rs b/core/src/connection.rs |
147 | index 2c4580e..aa1866e 100644 |
148 | --- a/core/src/connection.rs |
149 | +++ b/core/src/connection.rs |
150 | @@ -199,7 +199,7 @@ impl Connection { |
151 | conn.busy_timeout(core::time::Duration::from_millis(500))?; |
152 | conn.busy_handler(Some(|times: i32| -> bool { times < 5 }))?; |
153 | |
154 | - let mut ret = Self { |
155 | + let ret = Self { |
156 | conf, |
157 | connection: conn, |
158 | }; |
159 | @@ -232,13 +232,13 @@ impl Connection { |
160 | /// Migrate from version `from` to `to`. |
161 | /// |
162 | /// See [Self::MIGRATIONS]. |
163 | - pub fn migrate(&mut self, mut from: u32, to: u32) -> Result<()> { |
164 | + pub fn migrate(&self, mut from: u32, to: u32) -> Result<()> { |
165 | if from == to { |
166 | return Ok(()); |
167 | } |
168 | |
169 | let undo = from > to; |
170 | - let tx = self.connection.transaction()?; |
171 | + let tx = self.savepoint(Some(stringify!(migrate)))?; |
172 | |
173 | while from != to { |
174 | log::trace!( |
175 | @@ -247,15 +247,18 @@ impl Connection { |
176 | ); |
177 | if undo { |
178 | trace!("{}", Self::MIGRATIONS[from as usize].2); |
179 | - tx.execute(Self::MIGRATIONS[from as usize].2, [])?; |
180 | + tx.connection |
181 | + .execute(Self::MIGRATIONS[from as usize].2, [])?; |
182 | from -= 1; |
183 | } else { |
184 | trace!("{}", Self::MIGRATIONS[from as usize].1); |
185 | - tx.execute(Self::MIGRATIONS[from as usize].1, [])?; |
186 | + tx.connection |
187 | + .execute(Self::MIGRATIONS[from as usize].1, [])?; |
188 | from += 1; |
189 | } |
190 | } |
191 | - tx.pragma_update(None, "user_version", Self::MIGRATIONS[to as usize - 1].0)?; |
192 | + tx.connection |
193 | + .pragma_update(None, "user_version", Self::MIGRATIONS[to as usize - 1].0)?; |
194 | |
195 | tx.commit()?; |
196 | |
197 | @@ -354,10 +357,10 @@ impl Connection { |
198 | } |
199 | |
200 | /// Loads archive databases from [`Configuration::data_path`], if any. |
201 | - pub fn load_archives(&mut self) -> Result<()> { |
202 | - let tx = self.connection.transaction()?; |
203 | + pub fn load_archives(&self) -> Result<()> { |
204 | + let tx = self.savepoint(Some(stringify!(load_archives)))?; |
205 | { |
206 | - let mut stmt = tx.prepare("ATTACH ? AS ?;")?; |
207 | + let mut stmt = tx.connection.prepare("ATTACH ? AS ?;")?; |
208 | for archive in std::fs::read_dir(&self.conf.data_path)? { |
209 | let archive = archive?; |
210 | let path = archive.path(); |
211 | @@ -611,7 +614,7 @@ impl Connection { |
212 | } |
213 | |
214 | /// Update a mailing list. |
215 | - pub fn update_list(&mut self, change_set: MailingListChangeset) -> Result<()> { |
216 | + pub fn update_list(&self, change_set: MailingListChangeset) -> Result<()> { |
217 | if matches!( |
218 | change_set, |
219 | MailingListChangeset { |
220 | @@ -644,12 +647,12 @@ impl Connection { |
221 | hidden, |
222 | enabled, |
223 | } = change_set; |
224 | - let tx = self.connection.transaction()?; |
225 | + let tx = self.savepoint(Some(stringify!(update_list)))?; |
226 | |
227 | macro_rules! update { |
228 | ($field:tt) => {{ |
229 | if let Some($field) = $field { |
230 | - tx.execute( |
231 | + tx.connection.execute( |
232 | concat!("UPDATE list SET ", stringify!($field), " = ? WHERE pk = ?;"), |
233 | rusqlite::params![&$field, &pk], |
234 | )?; |
235 | @@ -673,7 +676,7 @@ impl Connection { |
236 | |
237 | /// Execute operations inside an SQL transaction. |
238 | pub fn transaction( |
239 | - &'_ self, |
240 | + &'_ mut self, |
241 | behavior: transaction::TransactionBehavior, |
242 | ) -> Result<transaction::Transaction<'_>> { |
243 | use transaction::*; |
244 | @@ -689,6 +692,30 @@ impl Connection { |
245 | drop_behavior: DropBehavior::Rollback, |
246 | }) |
247 | } |
248 | + |
249 | + /// Execute operations inside an SQL savepoint. |
250 | + pub fn savepoint(&'_ self, name: Option<&'static str>) -> Result<transaction::Savepoint<'_>> { |
251 | + use std::sync::atomic::{AtomicUsize, Ordering}; |
252 | + |
253 | + use transaction::*; |
254 | + static COUNTER: AtomicUsize = AtomicUsize::new(0); |
255 | + |
256 | + let name = name |
257 | + .map(Ok) |
258 | + .unwrap_or_else(|| Err(COUNTER.fetch_add(1, Ordering::Relaxed))); |
259 | + |
260 | + match name { |
261 | + Ok(ref n) => self.connection.execute_batch(&format!("SAVEPOINT {n}"))?, |
262 | + Err(ref i) => self.connection.execute_batch(&format!("SAVEPOINT _{i}"))?, |
263 | + }; |
264 | + |
265 | + Ok(Savepoint { |
266 | + conn: self, |
267 | + drop_behavior: DropBehavior::Rollback, |
268 | + name, |
269 | + committed: false, |
270 | + }) |
271 | + } |
272 | } |
273 | |
274 | /// Execute operations inside an SQL transaction. |
275 | @@ -698,7 +725,7 @@ pub mod transaction { |
276 | /// A transaction handle. |
277 | #[derive(Debug)] |
278 | pub struct Transaction<'conn> { |
279 | - pub(super) conn: &'conn Connection, |
280 | + pub(super) conn: &'conn mut Connection, |
281 | pub(super) drop_behavior: DropBehavior, |
282 | } |
283 | |
284 | @@ -778,10 +805,10 @@ pub mod transaction { |
285 | /// DEFERRED means that the transaction does not actually start until |
286 | /// the database is first accessed. |
287 | Deferred, |
288 | + #[default] |
289 | /// IMMEDIATE cause the database connection to start a new write |
290 | /// immediately, without waiting for a writes statement. |
291 | Immediate, |
292 | - #[default] |
293 | /// EXCLUSIVE prevents other database connections from reading the |
294 | /// database while the transaction is underway. |
295 | Exclusive, |
296 | @@ -806,6 +833,106 @@ pub mod transaction { |
297 | /// Panic. Used to enforce intentional behavior during development. |
298 | Panic, |
299 | } |
300 | + |
301 | + /// A savepoint handle. |
302 | + #[derive(Debug)] |
303 | + pub struct Savepoint<'conn> { |
304 | + pub(super) conn: &'conn Connection, |
305 | + pub(super) drop_behavior: DropBehavior, |
306 | + pub(super) name: std::result::Result<&'static str, usize>, |
307 | + pub(super) committed: bool, |
308 | + } |
309 | + |
310 | + impl Drop for Savepoint<'_> { |
311 | + fn drop(&mut self) { |
312 | + _ = self.finish_(); |
313 | + } |
314 | + } |
315 | + |
316 | + impl Savepoint<'_> { |
317 | + /// Commit and consume savepoint. |
318 | + pub fn commit(mut self) -> Result<()> { |
319 | + self.commit_() |
320 | + } |
321 | + |
322 | + fn commit_(&mut self) -> Result<()> { |
323 | + if !self.committed { |
324 | + match self.name { |
325 | + Ok(ref n) => self |
326 | + .conn |
327 | + .connection |
328 | + .execute_batch(&format!("RELEASE SAVEPOINT {n}"))?, |
329 | + Err(ref i) => self |
330 | + .conn |
331 | + .connection |
332 | + .execute_batch(&format!("RELEASE SAVEPOINT _{i}"))?, |
333 | + }; |
334 | + self.committed = true; |
335 | + } |
336 | + Ok(()) |
337 | + } |
338 | + |
339 | + /// Configure the savepoint to perform the specified action when it is |
340 | + /// dropped. |
341 | + #[inline] |
342 | + pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) { |
343 | + self.drop_behavior = drop_behavior; |
344 | + } |
345 | + |
346 | + /// A convenience method which consumes and rolls back a savepoint. |
347 | + #[inline] |
348 | + pub fn rollback(mut self) -> Result<()> { |
349 | + self.rollback_() |
350 | + } |
351 | + |
352 | + fn rollback_(&mut self) -> Result<()> { |
353 | + if !self.committed { |
354 | + match self.name { |
355 | + Ok(ref n) => self |
356 | + .conn |
357 | + .connection |
358 | + .execute_batch(&format!("ROLLBACK TO SAVEPOINT {n}"))?, |
359 | + Err(ref i) => self |
360 | + .conn |
361 | + .connection |
362 | + .execute_batch(&format!("ROLLBACK TO SAVEPOINT _{i}"))?, |
363 | + }; |
364 | + } |
365 | + Ok(()) |
366 | + } |
367 | + |
368 | + /// Consumes the savepoint, committing or rolling back according to |
369 | + /// the current setting (see `drop_behavior`). |
370 | + /// |
371 | + /// Functionally equivalent to the `Drop` implementation, but allows |
372 | + /// callers to see any errors that occur. |
373 | + #[inline] |
374 | + pub fn finish(mut self) -> Result<()> { |
375 | + self.finish_() |
376 | + } |
377 | + |
378 | + #[inline] |
379 | + fn finish_(&mut self) -> Result<()> { |
380 | + if self.conn.connection.is_autocommit() { |
381 | + return Ok(()); |
382 | + } |
383 | + match self.drop_behavior { |
384 | + DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()), |
385 | + DropBehavior::Rollback => self.rollback_(), |
386 | + DropBehavior::Ignore => Ok(()), |
387 | + DropBehavior::Panic => panic!("Savepoint dropped unexpectedly."), |
388 | + } |
389 | + } |
390 | + } |
391 | + |
392 | + impl std::ops::Deref for Savepoint<'_> { |
393 | + type Target = Connection; |
394 | + |
395 | + #[inline] |
396 | + fn deref(&self) -> &Connection { |
397 | + self.conn |
398 | + } |
399 | + } |
400 | } |
401 | |
402 | #[cfg(test)] |
403 | @@ -842,4 +969,124 @@ mod tests { |
404 | |
405 | _ = Connection::open_or_create_db(config).unwrap(); |
406 | } |
407 | + |
408 | + #[test] |
409 | + fn test_transactions() { |
410 | + use melib::smtp::{SmtpAuth, SmtpSecurity, SmtpServerConf}; |
411 | + use tempfile::TempDir; |
412 | + |
413 | + use super::transaction::*; |
414 | + use crate::SendMail; |
415 | + |
416 | + let tmp_dir = TempDir::new().unwrap(); |
417 | + let db_path = tmp_dir.path().join("mpot.db"); |
418 | + let data_path = tmp_dir.path().to_path_buf(); |
419 | + let config = Configuration { |
420 | + send_mail: SendMail::Smtp(SmtpServerConf { |
421 | + hostname: "127.0.0.1".into(), |
422 | + port: 25, |
423 | + envelope_from: "foo-chat@example.com".into(), |
424 | + auth: SmtpAuth::None, |
425 | + security: SmtpSecurity::None, |
426 | + extensions: Default::default(), |
427 | + }), |
428 | + db_path, |
429 | + data_path, |
430 | + administrators: vec![], |
431 | + }; |
432 | + let list = MailingList { |
433 | + pk: 0, |
434 | + name: "".into(), |
435 | + id: "".into(), |
436 | + description: None, |
437 | + address: "".into(), |
438 | + archive_url: None, |
439 | + }; |
440 | + let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
441 | + |
442 | + /* drop rollback */ |
443 | + let mut tx = db.transaction(Default::default()).unwrap(); |
444 | + tx.set_drop_behavior(DropBehavior::Rollback); |
445 | + let _new = tx.create_list(list.clone()).unwrap(); |
446 | + drop(tx); |
447 | + assert_eq!(&db.lists().unwrap(), &[]); |
448 | + |
449 | + /* drop commit */ |
450 | + let mut tx = db.transaction(Default::default()).unwrap(); |
451 | + tx.set_drop_behavior(DropBehavior::Commit); |
452 | + let new = tx.create_list(list.clone()).unwrap(); |
453 | + drop(tx); |
454 | + assert_eq!(&db.lists().unwrap(), &[new.clone()]); |
455 | + |
456 | + /* rollback with drop commit */ |
457 | + let mut tx = db.transaction(Default::default()).unwrap(); |
458 | + tx.set_drop_behavior(DropBehavior::Commit); |
459 | + let _new2 = tx |
460 | + .create_list(MailingList { |
461 | + id: "1".into(), |
462 | + address: "1".into(), |
463 | + ..list.clone() |
464 | + }) |
465 | + .unwrap(); |
466 | + tx.rollback().unwrap(); |
467 | + assert_eq!(&db.lists().unwrap(), &[new.clone()]); |
468 | + |
469 | + /* tx and then savepoint */ |
470 | + let tx = db.transaction(Default::default()).unwrap(); |
471 | + let sv = tx.savepoint(None).unwrap(); |
472 | + let new2 = sv |
473 | + .create_list(MailingList { |
474 | + id: "2".into(), |
475 | + address: "2".into(), |
476 | + ..list.clone() |
477 | + }) |
478 | + .unwrap(); |
479 | + sv.commit().unwrap(); |
480 | + tx.commit().unwrap(); |
481 | + assert_eq!(&db.lists().unwrap(), &[new.clone(), new2.clone()]); |
482 | + |
483 | + /* tx and then rollback savepoint */ |
484 | + let tx = db.transaction(Default::default()).unwrap(); |
485 | + let sv = tx.savepoint(None).unwrap(); |
486 | + let _new3 = sv |
487 | + .create_list(MailingList { |
488 | + id: "3".into(), |
489 | + address: "3".into(), |
490 | + ..list.clone() |
491 | + }) |
492 | + .unwrap(); |
493 | + sv.rollback().unwrap(); |
494 | + tx.commit().unwrap(); |
495 | + assert_eq!(&db.lists().unwrap(), &[new.clone(), new2.clone()]); |
496 | + |
497 | + /* tx, commit savepoint and then rollback commit */ |
498 | + let tx = db.transaction(Default::default()).unwrap(); |
499 | + let sv = tx.savepoint(None).unwrap(); |
500 | + let _new3 = sv |
501 | + .create_list(MailingList { |
502 | + id: "3".into(), |
503 | + address: "3".into(), |
504 | + ..list.clone() |
505 | + }) |
506 | + .unwrap(); |
507 | + sv.commit().unwrap(); |
508 | + tx.rollback().unwrap(); |
509 | + assert_eq!(&db.lists().unwrap(), &[new.clone(), new2.clone()]); |
510 | + |
511 | + /* nested savepoints */ |
512 | + let tx = db.transaction(Default::default()).unwrap(); |
513 | + let sv = tx.savepoint(None).unwrap(); |
514 | + let sv1 = sv.savepoint(None).unwrap(); |
515 | + let new3 = sv1 |
516 | + .create_list(MailingList { |
517 | + id: "3".into(), |
518 | + address: "3".into(), |
519 | + ..list |
520 | + }) |
521 | + .unwrap(); |
522 | + sv1.commit().unwrap(); |
523 | + sv.commit().unwrap(); |
524 | + tx.commit().unwrap(); |
525 | + assert_eq!(&db.lists().unwrap(), &[new, new2, new3]); |
526 | + } |
527 | } |
528 | diff --git a/core/src/lib.rs b/core/src/lib.rs |
529 | index 0d25046..9c3fdbb 100644 |
530 | --- a/core/src/lib.rs |
531 | +++ b/core/src/lib.rs |
532 | @@ -189,7 +189,7 @@ pub mod subscriptions; |
533 | mod templates; |
534 | |
535 | pub use config::{Configuration, SendMail}; |
536 | - pub use connection::*; |
537 | + pub use connection::{transaction, *}; |
538 | pub use errors::*; |
539 | use models::*; |
540 | pub use templates::*; |
541 | diff --git a/core/src/posts.rs b/core/src/posts.rs |
542 | index 36ab575..70eb7e1 100644 |
543 | --- a/core/src/posts.rs |
544 | +++ b/core/src/posts.rs |
545 | @@ -84,7 +84,11 @@ impl Connection { |
546 | } |
547 | |
548 | /// Process a new mailing list post. |
549 | - pub fn post(&mut self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> { |
550 | + /// |
551 | + /// In case multiple processes can access the database at any time, use an |
552 | + /// `EXCLUSIVE` transaction before calling this function. |
553 | + /// See [`Connection::transaction`]. |
554 | + pub fn post(&self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> { |
555 | let result = self.inner_post(env, raw, _dry_run); |
556 | if let Err(err) = result { |
557 | return match self.insert_to_queue(QueueEntry::new( |
558 | @@ -115,7 +119,7 @@ impl Connection { |
559 | result |
560 | } |
561 | |
562 | - fn inner_post(&mut self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> { |
563 | + fn inner_post(&self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> { |
564 | trace!("Received envelope to post: {:#?}", &env); |
565 | let tos = env.to().to_vec(); |
566 | if tos.is_empty() { |
567 | @@ -295,7 +299,7 @@ impl Connection { |
568 | |
569 | /// Process a new mailing list request. |
570 | pub fn request( |
571 | - &mut self, |
572 | + &self, |
573 | list: &DbVal<MailingList>, |
574 | request: ListRequest, |
575 | env: &Envelope, |
576 | diff --git a/core/src/queue.rs b/core/src/queue.rs |
577 | index c2c96b8..8cb311e 100644 |
578 | --- a/core/src/queue.rs |
579 | +++ b/core/src/queue.rs |
580 | @@ -214,8 +214,8 @@ impl Connection { |
581 | } |
582 | |
583 | /// Delete queue entries returning the deleted values. |
584 | - pub fn delete_from_queue(&mut self, queue: Queue, index: Vec<i64>) -> Result<Vec<QueueEntry>> { |
585 | - let tx = self.connection.transaction()?; |
586 | + pub fn delete_from_queue(&self, queue: Queue, index: Vec<i64>) -> Result<Vec<QueueEntry>> { |
587 | + let tx = self.savepoint(Some(stringify!(delete_from_queue)))?; |
588 | |
589 | let cl = |row: &rusqlite::Row<'_>| { |
590 | Ok(QueueEntry { |
591 | @@ -233,9 +233,11 @@ impl Connection { |
592 | }) |
593 | }; |
594 | let mut stmt = if index.is_empty() { |
595 | - tx.prepare("DELETE FROM queue WHERE which = ? RETURNING *;")? |
596 | + tx.connection |
597 | + .prepare("DELETE FROM queue WHERE which = ? RETURNING *;")? |
598 | } else { |
599 | - tx.prepare("DELETE FROM queue WHERE which = ? AND pk IN rarray(?) RETURNING *;")? |
600 | + tx.connection |
601 | + .prepare("DELETE FROM queue WHERE which = ? AND pk IN rarray(?) RETURNING *;")? |
602 | }; |
603 | let iter = if index.is_empty() { |
604 | stmt.query_map([&queue.as_str()], cl)? |
605 | @@ -279,7 +281,7 @@ mod tests { |
606 | administrators: vec![], |
607 | }; |
608 | |
609 | - let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
610 | + let db = Connection::open_or_create_db(config).unwrap().trusted(); |
611 | for i in 0..5 { |
612 | db.insert_to_queue( |
613 | QueueEntry::new( |
614 | diff --git a/core/src/subscriptions.rs b/core/src/subscriptions.rs |
615 | index a05c0ea..fce5678 100644 |
616 | --- a/core/src/subscriptions.rs |
617 | +++ b/core/src/subscriptions.rs |
618 | @@ -253,7 +253,7 @@ impl Connection { |
619 | } |
620 | |
621 | /// Accept subscription candidate. |
622 | - pub fn accept_candidate_subscription(&mut self, pk: i64) -> Result<DbVal<ListSubscription>> { |
623 | + pub fn accept_candidate_subscription(&self, pk: i64) -> Result<DbVal<ListSubscription>> { |
624 | let val = self.connection.query_row( |
625 | "INSERT INTO subscription(list, address, name, enabled, digest, verified, \ |
626 | hide_address, receive_duplicates, receive_own_posts, receive_confirmation) SELECT \ |
627 | @@ -311,7 +311,7 @@ impl Connection { |
628 | } |
629 | |
630 | /// Update a mailing list subscription. |
631 | - pub fn update_subscription(&mut self, change_set: ListSubscriptionChangeset) -> Result<()> { |
632 | + pub fn update_subscription(&self, change_set: ListSubscriptionChangeset) -> Result<()> { |
633 | let pk = self |
634 | .list_subscription_by_address(change_set.list, &change_set.address)? |
635 | .pk; |
636 | @@ -347,12 +347,12 @@ impl Connection { |
637 | receive_own_posts, |
638 | receive_confirmation, |
639 | } = change_set; |
640 | - let tx = self.connection.transaction()?; |
641 | + let tx = self.savepoint(Some(stringify!(update_subscription)))?; |
642 | |
643 | macro_rules! update { |
644 | ($field:tt) => {{ |
645 | if let Some($field) = $field { |
646 | - tx.execute( |
647 | + tx.connection.execute( |
648 | concat!( |
649 | "UPDATE subscription SET ", |
650 | stringify!($field), |
651 | @@ -547,7 +547,7 @@ impl Connection { |
652 | } |
653 | |
654 | /// Update an account. |
655 | - pub fn update_account(&mut self, change_set: AccountChangeset) -> Result<()> { |
656 | + pub fn update_account(&self, change_set: AccountChangeset) -> Result<()> { |
657 | let Some(acc) = self.account_by_address(&change_set.address)? else { |
658 | return Err(NotFound("account with this address not found!").into()); |
659 | }; |
660 | @@ -572,12 +572,12 @@ impl Connection { |
661 | password, |
662 | enabled, |
663 | } = change_set; |
664 | - let tx = self.connection.transaction()?; |
665 | + let tx = self.savepoint(Some(stringify!(update_account)))?; |
666 | |
667 | macro_rules! update { |
668 | ($field:tt) => {{ |
669 | if let Some($field) = $field { |
670 | - tx.execute( |
671 | + tx.connection.execute( |
672 | concat!( |
673 | "UPDATE account SET ", |
674 | stringify!($field), |
675 | @@ -616,7 +616,7 @@ mod tests { |
676 | administrators: vec![], |
677 | }; |
678 | |
679 | - let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
680 | + let db = Connection::open_or_create_db(config).unwrap().trusted(); |
681 | let list = db |
682 | .create_list(MailingList { |
683 | pk: -1, |
684 | diff --git a/core/tests/account.rs b/core/tests/account.rs |
685 | index 25e0bff..0a97f20 100644 |
686 | --- a/core/tests/account.rs |
687 | +++ b/core/tests/account.rs |
688 | @@ -70,7 +70,7 @@ fn test_accounts() { |
689 | assert_eq!(db.queue(Queue::Error).unwrap().len(), 0); |
690 | assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0); |
691 | |
692 | - let mut db = db.untrusted(); |
693 | + let db = db.untrusted(); |
694 | |
695 | let subscribe_bytes = b"From: Name <user@example.com> |
696 | To: <foo-chat+subscribe@example.com> |
697 | diff --git a/core/tests/error_queue.rs b/core/tests/error_queue.rs |
698 | index f239247..fa33b83 100644 |
699 | --- a/core/tests/error_queue.rs |
700 | +++ b/core/tests/error_queue.rs |
701 | @@ -76,7 +76,7 @@ fn test_error_queue() { |
702 | assert_eq!(db.queue(Queue::Error).unwrap().len(), 0); |
703 | |
704 | // drop privileges |
705 | - let mut db = db.untrusted(); |
706 | + let db = db.untrusted(); |
707 | |
708 | let input_bytes = include_bytes!("./test_sample_longmessage.eml"); |
709 | let envelope = melib::Envelope::from_bytes(input_bytes, None).expect("Could not parse message"); |
710 | diff --git a/core/tests/migrations.rs b/core/tests/migrations.rs |
711 | index 84c6448..f5464ec 100644 |
712 | --- a/core/tests/migrations.rs |
713 | +++ b/core/tests/migrations.rs |
714 | @@ -34,7 +34,7 @@ fn test_init_empty() { |
715 | administrators: vec![], |
716 | }; |
717 | |
718 | - let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
719 | + let db = Connection::open_or_create_db(config).unwrap().trusted(); |
720 | |
721 | let migrations = Connection::MIGRATIONS; |
722 | if migrations.is_empty() { |
723 | diff --git a/core/tests/smtp.rs b/core/tests/smtp.rs |
724 | index b9a4b44..63160a9 100644 |
725 | --- a/core/tests/smtp.rs |
726 | +++ b/core/tests/smtp.rs |
727 | @@ -39,7 +39,7 @@ fn test_smtp() { |
728 | administrators: vec![], |
729 | }; |
730 | |
731 | - let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
732 | + let db = Connection::open_or_create_db(config).unwrap().trusted(); |
733 | assert!(db.lists().unwrap().is_empty()); |
734 | let foo_chat = db |
735 | .create_list(MailingList { |
736 | @@ -193,7 +193,7 @@ fn test_smtp_mailcrab() { |
737 | administrators: vec![], |
738 | }; |
739 | |
740 | - let mut db = Connection::open_or_create_db(config).unwrap().trusted(); |
741 | + let db = Connection::open_or_create_db(config).unwrap().trusted(); |
742 | assert!(db.lists().unwrap().is_empty()); |
743 | let foo_chat = db |
744 | .create_list(MailingList { |
745 | diff --git a/core/tests/subscription.rs b/core/tests/subscription.rs |
746 | index d4a1e58..c83f201 100644 |
747 | --- a/core/tests/subscription.rs |
748 | +++ b/core/tests/subscription.rs |
749 | @@ -68,7 +68,7 @@ fn test_list_subscription() { |
750 | assert_eq!(db.queue(Queue::Error).unwrap().len(), 0); |
751 | assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0); |
752 | |
753 | - let mut db = db.untrusted(); |
754 | + let db = db.untrusted(); |
755 | |
756 | let post_bytes = b"From: Name <user@example.com> |
757 | To: <foo-chat@example.com> |
758 | @@ -193,7 +193,7 @@ fn test_post_rejection() { |
759 | assert_eq!(db.queue(Queue::Error).unwrap().len(), 0); |
760 | assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0); |
761 | |
762 | - let mut db = db.untrusted(); |
763 | + let db = db.untrusted(); |
764 | |
765 | let post_bytes = b"From: Name <user@example.com> |
766 | To: <foo-chat@example.com> |
767 | diff --git a/web/src/lists.rs b/web/src/lists.rs |
768 | index 00ba71d..bc25716 100644 |
769 | --- a/web/src/lists.rs |
770 | +++ b/web/src/lists.rs |
771 | @@ -346,7 +346,7 @@ pub async fn list_edit_post( |
772 | )); |
773 | }; |
774 | |
775 | - let mut db = db.trusted(); |
776 | + let db = db.trusted(); |
777 | match payload { |
778 | ChangeSetting::PostPolicy { |
779 | delete_post_policy: _, |
780 | diff --git a/web/src/settings.rs b/web/src/settings.rs |
781 | index aa2f693..7a29830 100644 |
782 | --- a/web/src/settings.rs |
783 | +++ b/web/src/settings.rs |
784 | @@ -96,7 +96,7 @@ pub async fn settings_post( |
785 | Form(payload): Form<ChangeSetting>, |
786 | state: Arc<AppState>, |
787 | ) -> Result<Redirect, ResponseError> { |
788 | - let mut db = Connection::open_db(state.conf.clone())?; |
789 | + let db = Connection::open_db(state.conf.clone())?; |
790 | let acc = db |
791 | .account_by_address(&user.address) |
792 | .with_status(StatusCode::BAD_REQUEST)? |
793 | @@ -338,7 +338,7 @@ pub async fn user_list_subscription_post( |
794 | Form(payload): Form<SubscriptionFormPayload>, |
795 | state: Arc<AppState>, |
796 | ) -> Result<Redirect, ResponseError> { |
797 | - let mut db = Connection::open_db(state.conf.clone())?; |
798 | + let db = Connection::open_db(state.conf.clone())?; |
799 | |
800 | let Some(list) = (match id { |
801 | ListPathIdentifier::Pk(id) => db.list(id)?, |