Commit

Author:

Hash:

Timestamp:

+667 -269 +/-23 browse

Kevin Schoon [me@kevinschoon.com]

7032b277c6b082a9b170635ec48ccb9fefe6d7b3

Fri, 01 May 2026 16:44:10 +0000 (1 week ago)

pick back up ayllu-build
pick back up ayllu-build

Replaces the local executor with a Podman based environment and
executes each workflow in a different container.
1diff --git a/.ayllu-build.json b/.ayllu-build.json
2index 9e90a84..af81f82 100644
3--- a/.ayllu-build.json
4+++ b/.ayllu-build.json
5 @@ -2,6 +2,7 @@
6 "workflows": [
7 {
8 "name": "lint",
9+ "image": "localhost/ayllu-build:latest",
10 "steps": [
11 {
12 "name": "cargo-fmt",
13 @@ -11,6 +12,7 @@
14 },
15 {
16 "name": "test",
17+ "image": "localhost/ayllu-build:latest",
18 "depends_on": [
19 "lint"
20 ],
21 @@ -27,6 +29,7 @@
22 },
23 {
24 "name": "build",
25+ "image": "localhost/ayllu-build:latest",
26 "depends_on": [
27 "test"
28 ],
29 @@ -50,4 +53,4 @@
30 ]
31 }
32 ]
33- }
34\ No newline at end of file
35+ }
36 diff --git a/.sqlx/query-111de213fa11fcf9c9b67468f16bff5bbf015907eac9b7f89215522cb0bc6eca.json b/.sqlx/query-111de213fa11fcf9c9b67468f16bff5bbf015907eac9b7f89215522cb0bc6eca.json
37new file mode 100644
38index 0000000..0105e1e
39--- /dev/null
40+++ b/.sqlx/query-111de213fa11fcf9c9b67468f16bff5bbf015907eac9b7f89215522cb0bc6eca.json
41 @@ -0,0 +1,20 @@
42+ {
43+ "db_name": "SQLite",
44+ "query": "INSERT INTO workflows (manifest_id, name, image)\nVALUES (?, ?, ?) RETURNING id\n",
45+ "describe": {
46+ "columns": [
47+ {
48+ "name": "id",
49+ "ordinal": 0,
50+ "type_info": "Integer"
51+ }
52+ ],
53+ "parameters": {
54+ "Right": 3
55+ },
56+ "nullable": [
57+ false
58+ ]
59+ },
60+ "hash": "111de213fa11fcf9c9b67468f16bff5bbf015907eac9b7f89215522cb0bc6eca"
61+ }
62 diff --git a/.sqlx/query-2f04597be35489c3679e00d031c3f7bbdce3963f59220db4ddc993e9ddcf6cca.json b/.sqlx/query-2f04597be35489c3679e00d031c3f7bbdce3963f59220db4ddc993e9ddcf6cca.json
63new file mode 100644
64index 0000000..cb5bfd4
65--- /dev/null
66+++ b/.sqlx/query-2f04597be35489c3679e00d031c3f7bbdce3963f59220db4ddc993e9ddcf6cca.json
67 @@ -0,0 +1,44 @@
68+ {
69+ "db_name": "SQLite",
70+ "query": "SELECT id,\n name,\n image,\n started_at,\n finished_at\nFROM workflows\nWHERE manifest_id = ?\n",
71+ "describe": {
72+ "columns": [
73+ {
74+ "name": "id",
75+ "ordinal": 0,
76+ "type_info": "Integer"
77+ },
78+ {
79+ "name": "name",
80+ "ordinal": 1,
81+ "type_info": "Text"
82+ },
83+ {
84+ "name": "image",
85+ "ordinal": 2,
86+ "type_info": "Text"
87+ },
88+ {
89+ "name": "started_at",
90+ "ordinal": 3,
91+ "type_info": "Integer"
92+ },
93+ {
94+ "name": "finished_at",
95+ "ordinal": 4,
96+ "type_info": "Integer"
97+ }
98+ ],
99+ "parameters": {
100+ "Right": 1
101+ },
102+ "nullable": [
103+ false,
104+ false,
105+ false,
106+ true,
107+ true
108+ ]
109+ },
110+ "hash": "2f04597be35489c3679e00d031c3f7bbdce3963f59220db4ddc993e9ddcf6cca"
111+ }
112 diff --git a/.sqlx/query-56693fef64333ae6ea608de82ee267f9121468f55c383f7a7a9c5fd0de160138.json b/.sqlx/query-56693fef64333ae6ea608de82ee267f9121468f55c383f7a7a9c5fd0de160138.json
113deleted file mode 100644
114index 0f0cc66..0000000
115--- a/.sqlx/query-56693fef64333ae6ea608de82ee267f9121468f55c383f7a7a9c5fd0de160138.json
116+++ /dev/null
117 @@ -1,20 +0,0 @@
118- {
119- "db_name": "SQLite",
120- "query": "INSERT INTO workflows (manifest_id, name)\nVALUES (?, ?) RETURNING id\n",
121- "describe": {
122- "columns": [
123- {
124- "name": "id",
125- "ordinal": 0,
126- "type_info": "Integer"
127- }
128- ],
129- "parameters": {
130- "Right": 2
131- },
132- "nullable": [
133- false
134- ]
135- },
136- "hash": "56693fef64333ae6ea608de82ee267f9121468f55c383f7a7a9c5fd0de160138"
137- }
138 diff --git a/.sqlx/query-56f6d69a4ce0c2f63585378a61150c90b38e2a2f90747da8f487ee5ad8818063.json b/.sqlx/query-56f6d69a4ce0c2f63585378a61150c90b38e2a2f90747da8f487ee5ad8818063.json
139deleted file mode 100644
140index 86e9e52..0000000
141--- a/.sqlx/query-56f6d69a4ce0c2f63585378a61150c90b38e2a2f90747da8f487ee5ad8818063.json
142+++ /dev/null
143 @@ -1,38 +0,0 @@
144- {
145- "db_name": "SQLite",
146- "query": "SELECT id,\n name,\n started_at,\n finished_at\nFROM workflows\nWHERE manifest_id = ?\n",
147- "describe": {
148- "columns": [
149- {
150- "name": "id",
151- "ordinal": 0,
152- "type_info": "Integer"
153- },
154- {
155- "name": "name",
156- "ordinal": 1,
157- "type_info": "Text"
158- },
159- {
160- "name": "started_at",
161- "ordinal": 2,
162- "type_info": "Integer"
163- },
164- {
165- "name": "finished_at",
166- "ordinal": 3,
167- "type_info": "Integer"
168- }
169- ],
170- "parameters": {
171- "Right": 1
172- },
173- "nullable": [
174- false,
175- false,
176- true,
177- true
178- ]
179- },
180- "hash": "56f6d69a4ce0c2f63585378a61150c90b38e2a2f90747da8f487ee5ad8818063"
181- }
182 diff --git a/.sqlx/query-c33cd08f3e6411eb15b05b3e2e08a24eb00a61a16cdad936856c0abb2b460193.json b/.sqlx/query-c33cd08f3e6411eb15b05b3e2e08a24eb00a61a16cdad936856c0abb2b460193.json
183deleted file mode 100644
184index 3e0be8e..0000000
185--- a/.sqlx/query-c33cd08f3e6411eb15b05b3e2e08a24eb00a61a16cdad936856c0abb2b460193.json
186+++ /dev/null
187 @@ -1,38 +0,0 @@
188- {
189- "db_name": "SQLite",
190- "query": "SELECT id, name, started_at, finished_at FROM workflows WHERE id = ?\n",
191- "describe": {
192- "columns": [
193- {
194- "name": "id",
195- "ordinal": 0,
196- "type_info": "Integer"
197- },
198- {
199- "name": "name",
200- "ordinal": 1,
201- "type_info": "Text"
202- },
203- {
204- "name": "started_at",
205- "ordinal": 2,
206- "type_info": "Integer"
207- },
208- {
209- "name": "finished_at",
210- "ordinal": 3,
211- "type_info": "Integer"
212- }
213- ],
214- "parameters": {
215- "Right": 1
216- },
217- "nullable": [
218- false,
219- false,
220- true,
221- true
222- ]
223- },
224- "hash": "c33cd08f3e6411eb15b05b3e2e08a24eb00a61a16cdad936856c0abb2b460193"
225- }
226 diff --git a/.sqlx/query-ee6a6095c3f766b647fa5db4e7c9f7fdff0f62d81230abb537875a7f7a0b2c59.json b/.sqlx/query-ee6a6095c3f766b647fa5db4e7c9f7fdff0f62d81230abb537875a7f7a0b2c59.json
227new file mode 100644
228index 0000000..6022770
229--- /dev/null
230+++ b/.sqlx/query-ee6a6095c3f766b647fa5db4e7c9f7fdff0f62d81230abb537875a7f7a0b2c59.json
231 @@ -0,0 +1,44 @@
232+ {
233+ "db_name": "SQLite",
234+ "query": "SELECT id, name, image, started_at, finished_at FROM workflows WHERE id = ?\n",
235+ "describe": {
236+ "columns": [
237+ {
238+ "name": "id",
239+ "ordinal": 0,
240+ "type_info": "Integer"
241+ },
242+ {
243+ "name": "name",
244+ "ordinal": 1,
245+ "type_info": "Text"
246+ },
247+ {
248+ "name": "image",
249+ "ordinal": 2,
250+ "type_info": "Text"
251+ },
252+ {
253+ "name": "started_at",
254+ "ordinal": 3,
255+ "type_info": "Integer"
256+ },
257+ {
258+ "name": "finished_at",
259+ "ordinal": 4,
260+ "type_info": "Integer"
261+ }
262+ ],
263+ "parameters": {
264+ "Right": 1
265+ },
266+ "nullable": [
267+ false,
268+ false,
269+ false,
270+ true,
271+ true
272+ ]
273+ },
274+ "hash": "ee6a6095c3f766b647fa5db4e7c9f7fdff0f62d81230abb537875a7f7a0b2c59"
275+ }
276 diff --git a/Cargo.lock b/Cargo.lock
277index 0d19863..3ab364f 100644
278--- a/Cargo.lock
279+++ b/Cargo.lock
280 @@ -329,8 +329,10 @@ dependencies = [
281 "ayllu_git",
282 "ayllu_logging",
283 "petgraph",
284+ "reqwest 0.13.3",
285 "serde",
286 "serde_json",
287+ "thiserror 2.0.18",
288 "tokio",
289 "tracing",
290 ]
291 @@ -473,9 +475,9 @@ dependencies = [
292
293 [[package]]
294 name = "bitflags"
295- version = "2.10.0"
296+ version = "2.11.1"
297 source = "registry+https://github.com/rust-lang/crates.io-index"
298- checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
299+ checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3"
300 dependencies = [
301 "serde_core",
302 ]
303 @@ -2127,7 +2129,7 @@ dependencies = [
304 "ayllu_api",
305 "ayllu_cmd",
306 "ayllu_config",
307- "reqwest",
308+ "reqwest 0.12.28",
309 "serde",
310 "thiserror 2.0.18",
311 "tokio",
312 @@ -2310,6 +2312,39 @@ dependencies = [
313 ]
314
315 [[package]]
316+ name = "reqwest"
317+ version = "0.13.3"
318+ source = "registry+https://github.com/rust-lang/crates.io-index"
319+ checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0"
320+ dependencies = [
321+ "base64 0.22.1",
322+ "bytes",
323+ "futures-core",
324+ "h2",
325+ "http",
326+ "http-body",
327+ "http-body-util",
328+ "hyper",
329+ "hyper-rustls",
330+ "hyper-util",
331+ "js-sys",
332+ "log",
333+ "percent-encoding",
334+ "pin-project-lite",
335+ "serde",
336+ "serde_json",
337+ "sync_wrapper",
338+ "tokio",
339+ "tower",
340+ "tower-http",
341+ "tower-service",
342+ "url",
343+ "wasm-bindgen",
344+ "wasm-bindgen-futures",
345+ "web-sys",
346+ ]
347+
348+ [[package]]
349 name = "ring"
350 version = "0.17.14"
351 source = "registry+https://github.com/rust-lang/crates.io-index"
352 @@ -2488,9 +2523,9 @@ dependencies = [
353
354 [[package]]
355 name = "security-framework-sys"
356- version = "2.14.0"
357+ version = "2.17.0"
358 source = "registry+https://github.com/rust-lang/crates.io-index"
359- checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32"
360+ checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3"
361 dependencies = [
362 "core-foundation-sys",
363 "libc",
364 @@ -3642,7 +3677,7 @@ dependencies = [
365 "http",
366 "nutype",
367 "percent-encoding",
368- "reqwest",
369+ "reqwest 0.12.28",
370 "serde",
371 "serde_json",
372 "serde_with",
373 diff --git a/Containerfile.build b/Containerfile.build
374new file mode 100644
375index 0000000..1442266
376--- /dev/null
377+++ b/Containerfile.build
378 @@ -0,0 +1,6 @@
379+ FROM docker.io/alpine:3.23
380+
381+ RUN apk add cargo rust pkgconf build-base git
382+ RUN adduser -D -h /src -s /bin/sh ayllu
383+ USER ayllu
384+ WORKDIR /src
385 diff --git a/ayllu-build/Cargo.toml b/ayllu-build/Cargo.toml
386index 7bc3fc6..fba8c44 100644
387--- a/ayllu-build/Cargo.toml
388+++ b/ayllu-build/Cargo.toml
389 @@ -20,4 +20,6 @@ tracing = { workspace = true }
390 serde = { workspace = true }
391 petgraph = { workspace = true }
392 tokio = { workspace = true }
393+ thiserror = { workspace = true }
394 serde_json = "1.0.149"
395+ reqwest = { version = "0.13.3", default-features = false, features = ["json", "http2"] }
396 diff --git a/ayllu-build/src/error.rs b/ayllu-build/src/error.rs
397index e5a5551..bcf6100 100644
398--- a/ayllu-build/src/error.rs
399+++ b/ayllu-build/src/error.rs
400 @@ -28,6 +28,7 @@ pub enum Error {
401 // general io error during execution
402 Io(std::io::Error),
403 Db(ayllu_database::Error),
404+ Libpod,
405 }
406
407 impl std::fmt::Display for Error {
408 @@ -52,6 +53,7 @@ impl std::fmt::Display for Error {
409 Error::RepositoryDoesNotContainManifest { path } => {
410 write!(f, "Repository {path:?} does not contain a manifest")
411 }
412+ Error::Libpod => todo!(),
413 }
414 }
415 }
416 @@ -69,3 +71,11 @@ impl From<std::io::Error> for Error {
417 Error::Io(value)
418 }
419 }
420+
421+ impl From<reqwest::Error> for Error {
422+ fn from(value: reqwest::Error) -> Self {
423+ // FIXME
424+ tracing::error!("{value}");
425+ Error::Libpod
426+ }
427+ }
428 diff --git a/ayllu-build/src/evaluate.rs b/ayllu-build/src/evaluate.rs
429index cfda07a..21b6b98 100644
430--- a/ayllu-build/src/evaluate.rs
431+++ b/ayllu-build/src/evaluate.rs
432 @@ -15,7 +15,7 @@ use crate::{
433 };
434 use crate::{
435 error::Error,
436- executor::{Context, Executor, Local},
437+ executor::{Context, Executor},
438 };
439 use ayllu_database::{
440 Tx, Wrapper as Database,
441 @@ -153,7 +153,9 @@ impl Runtime {
442 name: workflow.name.clone(),
443 });
444 }
445- let workflow_id = tx.create_workflow(manifest_id, &workflow.name).await?;
446+ let workflow_id = tx
447+ .create_workflow(manifest_id, &workflow.name, &workflow.image)
448+ .await?;
449 workflows_by_name.insert(
450 workflow.name.clone(),
451 graph.add_node(Unit::Workflow(workflow_id)),
452 @@ -222,7 +224,7 @@ impl Runtime {
453 }
454
455 /// Allocate a job graph and then execute it sequentially
456- pub async fn evaluate(&self) -> Result<i64, Error> {
457+ pub async fn evaluate(&self, executor: &impl Executor) -> Result<i64, Error> {
458 let mut tx = self.db.begin().await?;
459 let (manifest_id, graph) = match self.allocate(&mut tx).await {
460 Ok(db_op) => {
461 @@ -236,6 +238,7 @@ impl Runtime {
462 return Err(e);
463 }
464 };
465+
466 let dot_string = Dot::new(&graph).to_string();
467 tracing::info!(
468 "evaluating DAG [{}] [n_nodes={}]:\n{}",
469 @@ -258,22 +261,21 @@ impl Runtime {
470 match current_workflow.as_ref() {
471 Some(current_workflow_id) => {
472 self.db.update_workflow_finish(*current_workflow_id).await?;
473+ executor.shutdown(manifest_id, *current_workflow_id).await?;
474 }
475 None => {
476 self.db.update_workflow_start(next_workflow.id).await?;
477 }
478 };
479 current_workflow = Some(*next_workflow_id);
480+ executor
481+ .initialize(manifest_id, *next_workflow_id, &next_workflow.image)
482+ .await?;
483 }
484 Unit::Step(next_step_id) => {
485 let next_step = self.db.read_step(*next_step_id).await?;
486 tracing::info!("starting step: {}", next_step.name);
487 self.db.update_step_start(next_step.id).await?;
488- let executor = Local {
489- temp_dir: self.work_dir.clone(), // FIXME: Eliminate file based logging all together
490- tee_output: self.tee_output,
491- };
492-
493 // TODO: more context
494 let ctx = Context {
495 manifest_id,
496 @@ -282,18 +284,10 @@ impl Runtime {
497 repo_url: self.source.url(),
498 ..Default::default()
499 };
500-
501- let (lines, exit_code) = executor.execute(&next_step, ctx)?;
502- if !exit_code.success() {
503- tracing::warn!("step {} has failed: {:?}", next_step.name, exit_code);
504- }
505+ let (lines, exit_code) = executor.execute(&next_step, &ctx).await?;
506 // let (step_id, duration) = state.current_step().unwrap();
507 self.db
508- .update_step_finish(
509- *next_step_id,
510- lines.as_slice(),
511- exit_code.code().unwrap() as i8,
512- )
513+ .update_step_finish(*next_step_id, lines.as_slice(), exit_code as i8)
514 .await?;
515 }
516 }
517 diff --git a/ayllu-build/src/executor.rs b/ayllu-build/src/executor.rs
518index 7b84627..8e78e69 100644
519--- a/ayllu-build/src/executor.rs
520+++ b/ayllu-build/src/executor.rs
521 @@ -1,17 +1,7 @@
522 use std::collections::HashMap;
523- use std::env;
524- use std::fs::OpenOptions;
525- use std::io::{BufRead, BufReader};
526- use std::io::{Read, Write};
527- use std::mem::take;
528- use std::path::{Path, PathBuf};
529- use std::process::{Command, ExitStatus, Stdio};
530- use std::thread;
531- use std::time::Instant;
532
533- use ayllu_database::build::{LogLine, Output, Step};
534+ use ayllu_database::build::{LogLine, Step};
535 use serde::Deserialize;
536- use tracing::log::{debug, info};
537
538 use crate::error::Error;
539
540 @@ -30,132 +20,14 @@ pub struct Context {
541 // An executor runs the step in a process container of some kind. Currently
542 // the only executor that exists is the Local exector.
543 pub trait Executor {
544- fn execute(&self, step: &Step, context: Context) -> Result<(Vec<LogLine>, ExitStatus), Error>;
545- }
546-
547- // build executor that runs with the same permissions as the build server.
548- pub struct Local {
549- pub temp_dir: PathBuf,
550- pub tee_output: bool,
551- }
552-
553- fn write_stream(stream: impl Read, filename: &Path, output: Output) -> std::io::Result<()> {
554- tracing::info!("Allocating new log file {filename:?}");
555- let mut file = OpenOptions::new()
556- .create(true)
557- .write(true)
558- .truncate(false)
559- .open(filename)?;
560- let buf_reader = BufReader::new(stream);
561- let start = Instant::now();
562-
563- let mut lines = buf_reader.lines();
564- loop {
565- match lines.next() {
566- Some(Ok(line)) => {
567- let log_line = LogLine {
568- output: output.clone(),
569- runtime: Instant::now().duration_since(start),
570- line,
571- };
572- let line_str = serde_json::ser::to_string(&log_line).unwrap();
573- file.write_all(line_str.as_bytes())?;
574- file.write_all("\n".as_bytes())?;
575- }
576- Some(Err(e)) => return Err(e),
577- None => break,
578- }
579- }
580- Ok(())
581- }
582-
583- impl Executor for Local {
584- // TODO: once parallelism is enabled this should be converted to tokio
585- fn execute(&self, step: &Step, context: Context) -> Result<(Vec<LogLine>, ExitStatus), Error> {
586- let work_dir = self
587- .temp_dir
588- .as_path()
589- .join(context.manifest_id.to_string());
590- let src_dir = work_dir.join("src");
591- std::fs::create_dir_all(&src_dir)?;
592- let log_dir = work_dir.join("logs");
593- std::fs::create_dir_all(&log_dir)?;
594- if !ayllu_git::git_dir(&src_dir)? {
595- tracing::info!("Cloning repository {} to {work_dir:?}", context.repo_url);
596- ayllu_git::clone(&context.repo_url, src_dir.as_path(), None).unwrap();
597- }
598- let log_dir = self
599- .temp_dir
600- .as_path()
601- .join(context.manifest_id.to_string())
602- .join(context.workflow_id.to_string())
603- .join(context.step_id.to_string());
604-
605- std::fs::create_dir_all(&log_dir)?;
606-
607- // TODO: add env filter to config
608- let mut filtered_env: HashMap<String, String> = env::vars()
609- .filter(|(k, _)| k == "TERM" || k == "TZ" || k == "LANG" || k == "PATH")
610- .collect();
611- // FIXME: If user specified None for an env it should override the defaults
612- filtered_env.extend(
613- context
614- .environment
615- .iter()
616- .filter_map(|env| env.1.as_ref().map(|value| (env.0.clone(), value.clone()))),
617- );
618- filtered_env.extend([
619- (String::from("AYLLU_GIT_HASH"), context.git_hash.clone()),
620- (String::from("AYLLU_REPO_URL"), context.repo_url.clone()),
621- ]);
622- info!("starting command: {} {}", step.shell, step.input);
623-
624- // TODO: update this to write to files / support "tee" style logging
625- // when debugging output is enabled.
626- let mut proc = Command::new(step.shell.clone())
627- .arg("-c")
628- .arg(step.input.clone())
629- .envs(filtered_env)
630- .current_dir(&src_dir)
631- .stdout(Stdio::piped())
632- .stderr(Stdio::piped())
633- .spawn()?;
634-
635- let proc_stdout = take(&mut proc.stdout).expect("cannot get stdout");
636- let stdout_log_path = log_dir.clone().join("stdout.json");
637- let stdout_thd = thread::spawn(move || {
638- write_stream(proc_stdout, stdout_log_path.as_path(), Output::Stdout).unwrap();
639- });
640- let proc_stderr = take(&mut proc.stderr).expect("cannot get stderr");
641- let stderr_log_path = log_dir.clone().join("stderr.json");
642- let stderr_thd = thread::spawn(move || {
643- write_stream(proc_stderr, stderr_log_path.as_path(), Output::Stderr).unwrap();
644- });
645-
646- stdout_thd.join().unwrap();
647- stderr_thd.join().unwrap();
648-
649- debug!("waiting for process to complete");
650- let status_code = proc.wait().expect("failed to wait on process");
651-
652- let stdout_content = std::fs::read_to_string(log_dir.join("stdout.json").as_path())?;
653- let stderr_content = std::fs::read_to_string(log_dir.join("stderr.json").as_path())?;
654-
655- let mut log_lines: Vec<LogLine> = stdout_content
656- .lines()
657- .map(|content| serde_json::de::from_str::<LogLine>(content).unwrap())
658- .collect();
659-
660- let stderr_lines: Vec<LogLine> = stderr_content
661- .lines()
662- .map(|content| serde_json::de::from_str::<LogLine>(content).unwrap())
663- .collect();
664-
665- log_lines.extend(stderr_lines);
666- log_lines.sort_by(|first, second| first.runtime.cmp(&second.runtime));
667-
668- Ok((log_lines, status_code))
669- }
670+ async fn initialize(
671+ &self,
672+ manifest_id: i64,
673+ workflow_id: i64,
674+ image: &str,
675+ ) -> Result<(), Error>;
676+ async fn execute(&self, step: &Step, context: &Context) -> Result<(Vec<LogLine>, i32), Error>;
677+ async fn shutdown(&self, manifest_id: i64, workflow_id: i64) -> Result<(), Error>;
678 }
679
680 #[cfg(test)]
681 diff --git a/ayllu-build/src/executor_libpod.rs b/ayllu-build/src/executor_libpod.rs
682new file mode 100644
683index 0000000..59797ea
684--- /dev/null
685+++ b/ayllu-build/src/executor_libpod.rs
686 @@ -0,0 +1,119 @@
687+ use std::{path::Path, time::Instant};
688+
689+ use ayllu_database::build::LogLine;
690+ use tokio::sync::mpsc;
691+
692+ use crate::libpod::{Client, Request, container};
693+
694+ pub struct Libpod {
695+ client: Client,
696+ }
697+
698+ impl Libpod {
699+ pub fn new(socket_path: &Path) -> Self {
700+ Self {
701+ client: Client::new(socket_path),
702+ }
703+ }
704+ }
705+
706+ impl super::executor::Executor for Libpod {
707+ async fn initialize(
708+ &self,
709+ manifest_id: i64,
710+ workflow_id: i64,
711+ image: &str,
712+ ) -> Result<(), crate::error::Error> {
713+ let container_name = format!("ayllu-build-{manifest_id}-{workflow_id}");
714+ assert!(
715+ self.client
716+ .call(&Request::Ping)
717+ .await?
718+ .status()
719+ .is_success()
720+ );
721+ if !self
722+ .client
723+ .call(&Request::ContainerExists(container::Exists {
724+ name: container_name.clone(),
725+ }))
726+ .await?
727+ .status()
728+ .is_success()
729+ {
730+ self.client
731+ .call(&Request::CreateContainer(container::Create {
732+ name: container_name.clone(),
733+ image: image.to_string(),
734+ command: vec![String::from("sleep"), String::from("inf")],
735+ }))
736+ .await?;
737+ }
738+
739+ self.client
740+ .call(&Request::StartContainer(container::Start {
741+ name: container_name.clone(),
742+ }))
743+ .await?;
744+
745+ Ok(())
746+ }
747+
748+ async fn execute(
749+ &self,
750+ step: &ayllu_database::build::Step,
751+ context: &crate::executor::Context,
752+ ) -> Result<(Vec<ayllu_database::build::LogLine>, i32), crate::error::Error> {
753+ // FIXME stream directly into the database because it will result in
754+ // fast log output in the frontend.
755+ let (tx, mut rx) = mpsc::channel::<(crate::libpod::Stream, String)>(1);
756+ let (tx_signal, rx_signal) = tokio::sync::oneshot::channel::<i32>();
757+ let shell = step.shell.to_string();
758+ let input = step.input.to_string();
759+ let client = self.client.clone();
760+ let ctx = context.clone();
761+ tokio::spawn(async move {
762+ client
763+ .exec(&ctx, &shell, &input, tx, tx_signal)
764+ .await
765+ .unwrap()
766+ });
767+ let mut lines: Vec<LogLine> = Vec::new();
768+ let start = Instant::now();
769+ while let Some(msg) = rx.recv().await {
770+ match msg.0 {
771+ crate::libpod::Stream::Stdout => tracing::info!("{}", msg.1),
772+ crate::libpod::Stream::Stderr => tracing::warn!("{}", msg.1),
773+ _ => unreachable!(),
774+ }
775+ lines.push(LogLine {
776+ output: match msg.0 {
777+ crate::libpod::Stream::Stdout => ayllu_database::build::Output::Stdout,
778+ crate::libpod::Stream::Stderr => ayllu_database::build::Output::Stderr,
779+ _ => unimplemented!(),
780+ },
781+ runtime: Instant::now().duration_since(start),
782+ line: msg.1,
783+ });
784+ }
785+
786+ let signal = rx_signal.await.unwrap();
787+ if signal == 0 {
788+ tracing::info!("Process executed normally")
789+ } else {
790+ tracing::warn!("Non-zero exit code: {signal}")
791+ }
792+
793+ // self.client.call().await.unwrap();
794+ Ok((lines, signal))
795+ }
796+
797+ async fn shutdown(
798+ &self,
799+ _manifest_id: i64,
800+ _workflow_id: i64,
801+ ) -> Result<(), crate::error::Error> {
802+ // TODO
803+ Ok(())
804+ }
805+ }
806 diff --git a/ayllu-build/src/libpod.rs b/ayllu-build/src/libpod.rs
807new file mode 100644
808index 0000000..f203f6f
809--- /dev/null
810+++ b/ayllu-build/src/libpod.rs
811 @@ -0,0 +1,304 @@
812+ use std::{collections::HashMap, path::Path};
813+
814+ use serde_json::json;
815+ use tokio::{
816+ io::{AsyncReadExt, AsyncWriteExt},
817+ sync::mpsc::Sender,
818+ };
819+
820+ use reqwest::{
821+ Body, ClientBuilder, RequestBuilder, Url,
822+ header::{HeaderMap, HeaderName, HeaderValue},
823+ };
824+ use serde::{Deserialize, Serialize};
825+
826+ use crate::executor::Context;
827+
828+ #[derive(Debug)]
829+ pub enum Stream {
830+ Stdin,
831+ Stdout,
832+ Stderr,
833+ }
834+
835+ pub mod container {
836+
837+ use super::*;
838+
839+ #[derive(Serialize, Debug)]
840+ pub struct Create {
841+ pub name: String,
842+ pub image: String,
843+ pub command: Vec<String>,
844+ }
845+
846+ #[derive(Serialize, Debug)]
847+ pub struct Exists {
848+ pub name: String,
849+ }
850+
851+ #[derive(Serialize, Debug)]
852+ pub struct Start {
853+ pub name: String,
854+ }
855+
856+ #[derive(Serialize, Debug)]
857+ pub struct Attach {
858+ pub name: String,
859+ }
860+
861+ pub struct Header;
862+
863+ impl Header {
864+ pub fn write(stream: Stream, size: u32) -> [u8; 8] {
865+ let mut buf: [u8; 8] = [0, 0, 0, 0, 0, 0, 0, 0];
866+ match stream {
867+ Stream::Stdin => buf[0] = 0,
868+ Stream::Stdout => buf[0] = 1,
869+ Stream::Stderr => buf[0] = 2,
870+ };
871+ let size = size.to_be_bytes();
872+ buf[4] = size[0];
873+ buf[4] = size[1];
874+ buf[4] = size[2];
875+ buf[4] = size[3];
876+ buf
877+ }
878+
879+ pub fn read(input: &[u8; 8]) -> (Stream, u32) {
880+ let stream = match input[0] {
881+ 0 => Stream::Stdin,
882+ 1 => Stream::Stdout,
883+ 2 => Stream::Stderr,
884+ _ => unreachable!(),
885+ };
886+ let size = u32::from_be_bytes([input[4], input[5], input[6], input[7]]);
887+ (stream, size)
888+ }
889+ }
890+ }
891+
892+ pub mod exec {
893+
894+ use super::*;
895+
896+ #[derive(Serialize, Debug)]
897+ pub struct Create {
898+ pub name: String,
899+ #[serde(rename = "Cmd")]
900+ pub command: Vec<String>,
901+ }
902+
903+ #[derive(Serialize, Debug)]
904+ pub struct Start {
905+ pub id: String,
906+ }
907+
908+ #[derive(Serialize, Debug)]
909+ pub struct Inspect {
910+ pub id: String,
911+ }
912+
913+ #[derive(Deserialize, Debug)]
914+ pub struct InspectResponse {
915+ #[serde(rename = "ExitCode")]
916+ pub exit_code: i32,
917+ }
918+ }
919+
920+ pub mod images {
921+ use super::*;
922+
923+ #[derive(Serialize, Debug)]
924+ pub struct Pull {}
925+
926+ #[derive(Serialize, Debug)]
927+ pub struct Exists {}
928+ }
929+
930+ #[derive(Debug)]
931+ pub enum Request {
932+ Ping,
933+ CreateContainer(container::Create),
934+ ContainerExists(container::Exists),
935+ ContainerAttach(container::Attach),
936+ StartContainer(container::Start),
937+ CreateExec(exec::Create),
938+ StartExec(exec::Start),
939+ InspectExec(exec::Inspect),
940+ ImageExists(images::Exists),
941+ PullImage(images::Pull),
942+ }
943+
944+ impl Request {
945+ fn build(&self, client: &reqwest::Client) -> reqwest::RequestBuilder {
946+ match self {
947+ Request::Ping => client.get(Url::parse("http://_/libpod/_ping").unwrap()),
948+ Request::CreateContainer(create) => client
949+ .post(Url::parse("http://_/v6.0.0/libpod/containers/create").unwrap())
950+ .json(create),
951+ Request::ContainerExists(exists) => client.get(
952+ Url::parse(&format!(
953+ "http://_/libpod/containers/{}/exists",
954+ exists.name
955+ ))
956+ .unwrap(),
957+ ),
958+ Request::StartContainer(start) => client.post(
959+ Url::parse(&format!(
960+ "http://_/v6.0.0/libpod/containers/{}/start",
961+ start.name
962+ ))
963+ .unwrap(),
964+ ),
965+ Request::ContainerAttach(attach) => {
966+ let headers = HeaderMap::from_iter(vec![
967+ (
968+ HeaderName::from_static("connection"),
969+ HeaderValue::from_static("upgrade"),
970+ ),
971+ (
972+ HeaderName::from_static("upgrade"),
973+ HeaderValue::from_static("tcp"),
974+ ),
975+ ]);
976+ client
977+ .post(
978+ Url::parse_with_params(
979+ &format!("http://_/v6.0.0/libpod/containers/{}/attach", attach.name),
980+ &[
981+ ("stream", true.to_string()),
982+ ("stdout", true.to_string()),
983+ ("stderr", true.to_string()),
984+ ("stdin", true.to_string()),
985+ ],
986+ )
987+ .unwrap(),
988+ )
989+ .headers(headers)
990+ }
991+ Request::CreateExec(create) => client
992+ .post(
993+ Url::parse(&format!(
994+ "http://_/v6.0.0/libpod/containers/{}/exec",
995+ create.name
996+ ))
997+ .unwrap(),
998+ )
999+ .json(&json!({
1000+ "Cmd": create.command,
1001+ "AttachStdout": true,
1002+ "AttachStderr": true,
1003+ "AttachStdin": false,
1004+ })),
1005+ Request::StartExec(start) => {
1006+ let headers = HeaderMap::from_iter(vec![
1007+ (
1008+ HeaderName::from_static("connection"),
1009+ HeaderValue::from_static("upgrade"),
1010+ ),
1011+ (
1012+ HeaderName::from_static("upgrade"),
1013+ HeaderValue::from_static("tcp"),
1014+ ),
1015+ ]);
1016+ client
1017+ .post(
1018+ Url::parse(&format!("http://_/v6.0.0/libpod/exec/{}/start", start.id))
1019+ .unwrap(),
1020+ )
1021+ .headers(headers)
1022+ .json(&json!({"Detach": false}))
1023+ }
1024+ Request::InspectExec(inspect) => client.get(
1025+ Url::parse(&format!("http://_/v6.0.0/libpod/exec/{}/json", inspect.id)).unwrap(),
1026+ ),
1027+ Request::ImageExists(exists) => todo!(),
1028+ Request::PullImage(pull) => todo!(),
1029+ }
1030+ }
1031+ }
1032+
1033+ pub type Error = reqwest::Error;
1034+
1035+ #[derive(Clone)]
1036+ pub struct Client {
1037+ inner: reqwest::Client,
1038+ }
1039+
1040+ impl Client {
1041+ pub fn new(path: &Path) -> Self {
1042+ let builder = ClientBuilder::new().unix_socket(path);
1043+ let inner = builder.build().unwrap();
1044+ Self { inner }
1045+ }
1046+
1047+ pub async fn call(&self, req: &Request) -> Result<reqwest::Response, Error> {
1048+ tracing::debug!("making libpod request: {req:?}");
1049+ let req = req.build(&self.inner);
1050+ let req = req.build()?;
1051+ let url = req.url().clone();
1052+ let res = self.inner.execute(req).await?;
1053+ if !res.status().is_server_error() {
1054+ tracing::info!("libpod: {}: {}", url, res.status());
1055+ } else {
1056+ tracing::error!("libpod: {}: {}", url, res.status());
1057+ }
1058+ Ok(res)
1059+ }
1060+
1061+ pub async fn exec(
1062+ &self,
1063+ ctx: &Context,
1064+ shell: &str,
1065+ input: &str,
1066+ ch: Sender<(Stream, String)>,
1067+ signal: tokio::sync::oneshot::Sender<i32>,
1068+ ) -> Result<(), Error> {
1069+ let name = format!("ayllu-build-{}-{}", ctx.manifest_id, ctx.workflow_id);
1070+ let res = self
1071+ .call(&Request::CreateExec(exec::Create {
1072+ name: name.to_string(),
1073+ command: vec![shell.to_string(), "-c".to_string(), input.to_string()],
1074+ }))
1075+ .await?;
1076+ let res = res.json::<HashMap<String, String>>().await.unwrap();
1077+ let id = res.get("Id").unwrap();
1078+ tracing::info!("created exec id {id}");
1079+ let res = self
1080+ .call(&Request::StartExec(exec::Start { id: id.clone() }))
1081+ .await?;
1082+ assert!(res.status().as_u16() == 101); // FIXME
1083+ let conn = res.upgrade().await?;
1084+ let mut reader = tokio::io::BufReader::new(conn);
1085+
1086+ loop {
1087+ let mut header: [u8; 8] = [0, 0, 0, 0, 0, 0, 0, 0];
1088+ match reader.read_exact(&mut header).await {
1089+ Ok(_) => {
1090+ let (stream, size) = container::Header::read(&header);
1091+ tracing::debug!("read stream: {stream:?} {size}");
1092+ let mut buf = vec![0; size as usize];
1093+ reader.read_exact(buf.as_mut_slice()).await.unwrap();
1094+ let line = String::from_utf8_lossy(buf.as_slice());
1095+ ch.send((stream, line.to_string())).await.unwrap();
1096+ }
1097+ Err(e) => match e.kind() {
1098+ tokio::io::ErrorKind::UnexpectedEof => break,
1099+ e => {
1100+ tracing::error!("Libpod io error: {e:?}");
1101+ unimplemented!()
1102+ }
1103+ },
1104+ }
1105+ }
1106+
1107+ let res = self
1108+ .call(&Request::InspectExec(exec::Inspect { id: id.clone() }))
1109+ .await?
1110+ .json::<exec::InspectResponse>()
1111+ .await?;
1112+ signal.send(res.exit_code).unwrap();
1113+ Ok(())
1114+ }
1115+ }
1116 diff --git a/ayllu-build/src/main.rs b/ayllu-build/src/main.rs
1117index 0203ddc..72f384d 100644
1118--- a/ayllu-build/src/main.rs
1119+++ b/ayllu-build/src/main.rs
1120 @@ -10,6 +10,9 @@ mod config;
1121 mod error;
1122 mod evaluate;
1123 mod executor;
1124+ mod executor_libpod;
1125+ // mod executor_local;
1126+ mod libpod;
1127 mod models;
1128
1129 const DEFAULT_BUILD_FILE: &str = ".ayllu-build.json";
1130 @@ -24,6 +27,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1131 alternate_path,
1132 tee_output,
1133 } => {
1134+ let workdir = cfg.builder.work_dir.clone();
1135 let db = Builder::default()
1136 .path(cfg.database.path.as_path())
1137 .log_queries(false) // FIXME
1138 @@ -35,9 +39,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1139 db,
1140 source: Source::Path(source_path),
1141 tee_output,
1142- work_dir: cfg.builder.work_dir,
1143+ work_dir: workdir.clone(),
1144 };
1145- rt.evaluate().await?;
1146+
1147+ let executor = executor_libpod::Libpod::new(Path::new("/tmp/podman.sock"));
1148+ rt.evaluate(&executor).await?;
1149 Ok(())
1150 }
1151 }
1152 diff --git a/ayllu-build/src/models.rs b/ayllu-build/src/models.rs
1153index 1f1a769..ad97ea1 100644
1154--- a/ayllu-build/src/models.rs
1155+++ b/ayllu-build/src/models.rs
1156 @@ -35,6 +35,7 @@ impl Step {
1157 #[derive(Deserialize, Debug, Clone)]
1158 pub struct Workflow {
1159 pub name: String,
1160+ pub image: String,
1161 pub steps: Vec<Step>,
1162 #[serde(default = "Vec::new")]
1163 pub depends_on: Vec<String>,
1164 diff --git a/crates/cmd/src/build.rs b/crates/cmd/src/build.rs
1165index 47135fa..659e56f 100644
1166--- a/crates/cmd/src/build.rs
1167+++ b/crates/cmd/src/build.rs
1168 @@ -2,6 +2,7 @@ use std::path::PathBuf;
1169
1170 use clap::{Parser, Subcommand};
1171 use tracing::Level;
1172+ use url::Url;
1173
1174 const LONG_ABOUT_DESCRIPTION: &str = r#"
1175
1176 @@ -29,6 +30,16 @@ pub struct Command {
1177 pub command: Commands,
1178 }
1179
1180+ pub enum Source {
1181+ LocalPath(PathBuf),
1182+ RemoteUrl(Url),
1183+ }
1184+
1185+ pub enum Environment {
1186+ Local,
1187+ Libpod,
1188+ }
1189+
1190 #[derive(Subcommand, Debug, PartialEq)]
1191 pub enum Commands {
1192 /// evaluate a local build script
1193 diff --git a/crates/database/queries/workflows_create.sql b/crates/database/queries/workflows_create.sql
1194index 0063b5f..ac4b392 100644
1195--- a/crates/database/queries/workflows_create.sql
1196+++ b/crates/database/queries/workflows_create.sql
1197 @@ -1,2 +1,2 @@
1198- INSERT INTO workflows (manifest_id, name)
1199- VALUES (?, ?) RETURNING id
1200+ INSERT INTO workflows (manifest_id, name, image)
1201+ VALUES (?, ?, ?) RETURNING id
1202 diff --git a/crates/database/queries/workflows_list.sql b/crates/database/queries/workflows_list.sql
1203index 7ba11c6..8914be1 100644
1204--- a/crates/database/queries/workflows_list.sql
1205+++ b/crates/database/queries/workflows_list.sql
1206 @@ -1,5 +1,6 @@
1207 SELECT id,
1208 name,
1209+ image,
1210 started_at,
1211 finished_at
1212 FROM workflows
1213 diff --git a/crates/database/queries/workflows_read.sql b/crates/database/queries/workflows_read.sql
1214index a772b4b..9335ff0 100644
1215--- a/crates/database/queries/workflows_read.sql
1216+++ b/crates/database/queries/workflows_read.sql
1217 @@ -1 +1 @@
1218- SELECT id, name, started_at, finished_at FROM workflows WHERE id = ?
1219+ SELECT id, name, image, started_at, finished_at FROM workflows WHERE id = ?
1220 diff --git a/crates/database/src/build.rs b/crates/database/src/build.rs
1221index dcc1675..b2fc00f 100644
1222--- a/crates/database/src/build.rs
1223+++ b/crates/database/src/build.rs
1224 @@ -86,6 +86,7 @@ impl Manifest {
1225 pub struct Workflow {
1226 pub id: i64,
1227 pub name: String,
1228+ pub image: String,
1229 pub started_at: Option<i64>,
1230 pub finished_at: Option<i64>,
1231 }
1232 @@ -151,7 +152,12 @@ pub trait BuildTx {
1233 name: &str,
1234 git_hash: &str,
1235 ) -> Result<i64, Error>;
1236- async fn create_workflow(&mut self, manifest_id: i64, name: &str) -> Result<i64, Error>;
1237+ async fn create_workflow(
1238+ &mut self,
1239+ manifest_id: i64,
1240+ name: &str,
1241+ image: &str,
1242+ ) -> Result<i64, Error>;
1243 async fn create_step(
1244 &mut self,
1245 job_id: i64,
1246 @@ -178,8 +184,13 @@ impl BuildTx for Tx {
1247 Ok(ret.id)
1248 }
1249
1250- async fn create_workflow(&mut self, manifest_id: i64, name: &str) -> Result<i64, Error> {
1251- let ret = sqlx::query_file!("queries/workflows_create.sql", manifest_id, name)
1252+ async fn create_workflow(
1253+ &mut self,
1254+ manifest_id: i64,
1255+ name: &str,
1256+ image: &str,
1257+ ) -> Result<i64, Error> {
1258+ let ret = sqlx::query_file!("queries/workflows_create.sql", manifest_id, name, image)
1259 .fetch_one(&mut *self.inner)
1260 .await?;
1261 Ok(ret.id)
1262 @@ -242,7 +253,12 @@ pub trait BuildExt {
1263 ) -> Result<ManifestView, Error>;
1264
1265 // workflows
1266- async fn create_workflow(&self, manifest_id: i64, name: &str) -> Result<i64, Error>;
1267+ async fn create_workflow(
1268+ &self,
1269+ manifest_id: i64,
1270+ name: &str,
1271+ image: &str,
1272+ ) -> Result<i64, Error>;
1273 async fn update_workflow_start(&self, workflow_id: i64) -> Result<(), Error>;
1274 async fn update_workflow_finish(&self, workflow_id: i64) -> Result<(), Error>;
1275 async fn read_workflow(&self, workflow_id: i64) -> Result<Workflow, Error>;
1276 @@ -470,8 +486,13 @@ impl BuildExt for Database {
1277 Ok(WorkflowView { workflow, steps })
1278 }
1279
1280- async fn create_workflow(&self, manifest_id: i64, name: &str) -> Result<i64, Error> {
1281- let ret = sqlx::query_file!("queries/workflows_create.sql", manifest_id, name)
1282+ async fn create_workflow(
1283+ &self,
1284+ manifest_id: i64,
1285+ name: &str,
1286+ image: &str,
1287+ ) -> Result<i64, Error> {
1288+ let ret = sqlx::query_file!("queries/workflows_create.sql", manifest_id, name, image)
1289 .fetch_one(&self.pool)
1290 .await?;
1291 Ok(ret.id)
1292 diff --git a/migrations/20231204194038_init.sql b/migrations/20231204194038_init.sql
1293index 4528e18..79b5e5e 100644
1294--- a/migrations/20231204194038_init.sql
1295+++ b/migrations/20231204194038_init.sql
1296 @@ -12,6 +12,7 @@ CREATE TABLE workflows (
1297 id INTEGER PRIMARY KEY NOT NULL,
1298 manifest_id INTEGER REFERENCES manifests(id) ON DELETE CASCADE NOT NULL,
1299 name TEXT NOT NULL,
1300+ image TEXT NOT NULL,
1301 started_at INTEGER CHECK (started_at > 0),
1302 finished_at INTEGER CHECK (finished_at > 0)
1303 ) STRICT ;