use chrono::Utc; use tokio::io::{AsyncReadExt, BufReader}; use tokio::process::Command; use uuid::Uuid; use crate::{models::Deploy, AppState, DbPool}; /// Create a deploy record and push its ID onto the build queue. pub async fn enqueue_deploy( state: &AppState, app_id: &str, triggered_by: &str, sha: Option, ) -> anyhow::Result { let id = Uuid::new_v4().to_string(); let now = Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO deploys (id, app_id, sha, status, log, triggered_by, created_at) VALUES (?, ?, ?, 'queued', '', ?, ?)", ) .bind(&id) .bind(app_id) .bind(&sha) .bind(triggered_by) .bind(&now) .execute(&state.db) .await?; state.build_queue.lock().await.push_back(id.clone()); let deploy = sqlx::query_as::<_, Deploy>("SELECT * FROM deploys WHERE id = ?") .bind(&id) .fetch_one(&state.db) .await?; Ok(deploy) } /// Long-running background task — processes one deploy at a time. pub async fn build_worker(state: AppState) { loop { let deploy_id = state.build_queue.lock().await.pop_front(); match deploy_id { Some(id) => { if let Err(e) = run_build(&state, &id).await { tracing::error!("Build {} failed: {}", id, e); // Surface the Rust-level error in the deploy log so it's visible in the UI. let msg = format!("\n[hiy] FATAL: {}\n", e); let _ = append_log(&state.db, &id, &msg).await; let _ = set_status(&state.db, &id, "failed").await; } } None => { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } } } } async fn run_build(state: &AppState, deploy_id: &str) -> anyhow::Result<()> { let deploy = sqlx::query_as::<_, Deploy>("SELECT * FROM deploys WHERE id = ?") .bind(deploy_id) .fetch_one(&state.db) .await?; let app = sqlx::query_as::<_, crate::models::App>("SELECT * FROM apps WHERE id = ?") .bind(&deploy.app_id) .fetch_one(&state.db) .await?; // Write env file so the build script can inject it into the container. let env_dir = format!("{}/envs", state.data_dir); std::fs::create_dir_all(&env_dir)?; let env_file = format!("{}/{}.env", env_dir, app.id); let env_vars = sqlx::query_as::<_, crate::models::EnvVar>( "SELECT * FROM env_vars WHERE app_id = ?", ) .bind(&app.id) .fetch_all(&state.db) .await?; let mut env_content = String::new(); for e in &env_vars { let plain = crate::crypto::decrypt(&e.value) .unwrap_or_else(|err| { tracing::warn!("Could not decrypt env var {}: {} — using raw value", e.key, err); e.value.clone() }); env_content.push_str(&format!("{}={}\n", e.key, plain)); } std::fs::write(&env_file, env_content)?; // Mark as building. let now = Utc::now().to_rfc3339(); sqlx::query("UPDATE deploys SET status = 'building', started_at = ? WHERE id = ?") .bind(&now) .bind(deploy_id) .execute(&state.db) .await?; let build_script = std::env::var("HIY_BUILD_SCRIPT") .unwrap_or_else(|_| "./builder/build.sh".into()); // For git-push deploys, use the local bare repo instead of the remote URL. let repo_url = if deploy.triggered_by == "git-push" { format!("file://{}/repos/{}.git", state.data_dir, app.id) } else { app.repo_url.clone() }; let build_dir = format!("{}/builds/{}", state.data_dir, app.id); // Log diagnostics before spawning so even a spawn failure leaves a breadcrumb. let cwd = std::env::current_dir() .map(|p| p.display().to_string()) .unwrap_or_else(|_| "".into()); let script_exists = std::path::Path::new(&build_script).exists(); append_log( &state.db, deploy_id, &format!( "[hiy] CWD: {}\n\ [hiy] Build script: {} (exists={})\n\ [hiy] Build dir: {}\n\ [hiy] Env file: {}\n---\n", cwd, build_script, script_exists, build_dir, env_file ), ) .await?; let domain_suffix = std::env::var("DOMAIN_SUFFIX").unwrap_or_else(|_| "localhost".into()); let caddy_api_url = std::env::var("CADDY_API_URL").unwrap_or_else(|_| "http://localhost:2019".into()); let mut cmd = Command::new("bash"); cmd.arg(&build_script) .env("APP_ID", &app.id) .env("APP_NAME", &app.name) .env("REPO_URL", &repo_url); // Decrypt the git token (if any) and pass it separately so build.sh can // inject it into the clone URL without it appearing in REPO_URL or logs. if let Some(enc) = &app.git_token { match crate::crypto::decrypt(enc) { Ok(tok) => { cmd.env("GIT_TOKEN", tok); } Err(e) => tracing::warn!("Could not decrypt git_token for {}: {}", app.id, e), } } let mut child = cmd .env("BRANCH", &app.branch) .env("PORT", app.port.to_string()) .env("ENV_FILE", &env_file) .env("SHA", deploy.sha.as_deref().unwrap_or("")) .env("BUILD_DIR", &build_dir) .env("MEMORY_LIMIT", &app.memory_limit) .env("CPU_LIMIT", &app.cpu_limit) .env("IS_PUBLIC", if app.is_public != 0 { "1" } else { "0" }) .env("DOMAIN_SUFFIX", &domain_suffix) .env("CADDY_API_URL", &caddy_api_url) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| anyhow::anyhow!("Failed to spawn '{}': {}", build_script, e))?; let stdout = child.stdout.take().expect("piped stdout"); let stderr = child.stderr.take().expect("piped stderr"); // Read stdout/stderr in 4 KB chunks so we stream incrementally AND capture // any partial last line that has no trailing newline (which lines() drops). let db1 = state.db.clone(); let id1 = deploy_id.to_string(); let stdout_task = tokio::spawn(async move { let mut reader = BufReader::new(stdout); let mut buf = vec![0u8; 4096]; loop { match reader.read(&mut buf).await { Ok(0) | Err(_) => break, Ok(n) => { let chunk = String::from_utf8_lossy(&buf[..n]).into_owned(); let _ = append_log(&db1, &id1, &chunk).await; } } } }); let db2 = state.db.clone(); let id2 = deploy_id.to_string(); let stderr_task = tokio::spawn(async move { let mut reader = BufReader::new(stderr); let mut buf = vec![0u8; 4096]; loop { match reader.read(&mut buf).await { Ok(0) | Err(_) => break, Ok(n) => { let chunk = String::from_utf8_lossy(&buf[..n]).into_owned(); let _ = append_log(&db2, &id2, &chunk).await; } } } }); let exit_status = child.wait().await?; let _ = tokio::join!(stdout_task, stderr_task); // Always record the exit code — the one line that survives even silent failures. let code = exit_status.code().map(|c| c.to_string()).unwrap_or_else(|| "signal".into()); append_log(&state.db, deploy_id, &format!("\n[hiy] exit code: {}\n", code)).await?; let final_status = if exit_status.success() { "success" } else { "failed" }; let finished = Utc::now().to_rfc3339(); sqlx::query("UPDATE deploys SET status = ?, finished_at = ? WHERE id = ?") .bind(final_status) .bind(&finished) .bind(deploy_id) .execute(&state.db) .await?; tracing::info!("Deploy {} finished: {}", deploy_id, final_status); Ok(()) } async fn append_log(db: &DbPool, deploy_id: &str, line: &str) -> anyhow::Result<()> { sqlx::query("UPDATE deploys SET log = log || ? WHERE id = ?") .bind(line) .bind(deploy_id) .execute(db) .await?; Ok(()) } async fn set_status(db: &DbPool, deploy_id: &str, status: &str) -> anyhow::Result<()> { sqlx::query("UPDATE deploys SET status = ? WHERE id = ?") .bind(status) .bind(deploy_id) .execute(db) .await?; Ok(()) }