diff --git a/anneal/v2/src/util.rs b/anneal/v2/src/util.rs new file mode 100644 index 0000000000..e3533277f5 --- /dev/null +++ b/anneal/v2/src/util.rs @@ -0,0 +1,338 @@ +// Copyright 2026 The Fuchsia Authors +// +// Licensed under the 2-Clause BSD License , Apache License, Version 2.0 +// , or the MIT +// license , at your option. +// This file may not be copied, modified, or distributed except according to +// those terms. + +use anyhow::Context as _; +use fs2::FileExt as _; +use std::io::BufRead as _; + +/// Represents an active, exclusive lock on a directory. +/// +/// This struct guarantees that the process holds an OS-level file lock +/// guarding the specified directory. +pub(crate) struct DirLock { + /// The path to the directory being guarded. + pub(crate) path: std::path::PathBuf, + // Kept alive to hold the flock. + _file: std::fs::File, +} + +impl DirLock { + /// Acquires an exclusive lock on the specified directory. + /// + /// This function blocks until the lock can be acquired. We use a + /// separate `.lock` file within the directory rather than locking + /// the directory itself to avoid platform-specific issues with + /// directory locking and to ensure the lock file persists even if + /// the directory is cleaned. + pub(crate) fn lock_exclusive(path: std::path::PathBuf) -> anyhow::Result { + let file = Self::open_lock_file(&path)?; + file.lock_exclusive() + .with_context(|| format!("Failed to acquire exclusive lock on {:?}", path))?; + Ok(Self { path, _file: file }) + } + + /// Acquires a shared lock on the specified directory. + /// + /// Multiple processes can hold shared locks simultaneously, but an + /// exclusive lock will block until all shared locks are released. + pub(crate) fn lock_shared(path: std::path::PathBuf) -> anyhow::Result { + let file = Self::open_lock_file(&path)?; + file.lock_shared() + .with_context(|| format!("Failed to acquire shared lock on {:?}", path))?; + Ok(Self { path, _file: file }) + } + + fn open_lock_file(path: &std::path::Path) -> anyhow::Result { + let lock_path = path.join(".lock"); + + // Ensure the directory exists. + if let Some(parent) = lock_path.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("Failed to create directory for lock file: {:?}", parent) + })?; + } + // If the lock file already exists, we open it in read-only mode. + // This prevents failures if the file is read-only (e.g., after + // making the toolchain directory read-only), while still allowing + // us to acquire shared and exclusive locks on the file descriptor. + if lock_path.exists() { + std::fs::OpenOptions::new() + .read(true) + .open(&lock_path) + .with_context(|| format!("Failed to open lock file at {:?}", lock_path)) + } else { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&lock_path) + .with_context(|| format!("Failed to create lock file at {:?}", lock_path)) + } + } +} + +/// Walks a directory recursively and replaces string patterns inside `.trace` +/// files. This is used to patch non-portable paths generated by Lake. +pub(crate) fn patch_trace_files( + dir: &std::path::Path, + replacements: &[(&str, &str)], +) -> anyhow::Result<()> { + if dir.exists() { + let walker = walkdir::WalkDir::new(dir).into_iter(); + for entry in walker { + let entry = entry.context("Failed to walk directory for trace patching")?; + let path = entry.path(); + if path.is_file() && path.extension().map_or(false, |ext| ext == "trace") { + let content = std::fs::read_to_string(path) + .with_context(|| format!("Failed to read trace file {:?}", path))?; + let mut new_content = content.clone(); + for (from, to) in replacements { + new_content = new_content.replace(from, to); + } + if new_content != content { + std::fs::write(path, new_content) + .with_context(|| format!("Failed to write trace file {:?}", path))?; + } + } + } + } + Ok(()) +} + +/// Prepends a path to an existing environment variable, +/// separating them with a colon if the variable is not empty. This contains the +/// pure string formatting logic. +fn prepend_to_env_var_impl( + current_val: std::ffi::OsString, + new_path: &std::path::Path, +) -> std::ffi::OsString { + let mut combined = new_path.to_path_buf().into_os_string(); + if !current_val.is_empty() { + combined.push(":"); + combined.push(current_val); + } + combined +} + +/// Prepends a path to an existing environment variable in the active process +/// environment, separating them with a colon if the variable is not empty. +/// This is used to inject our managed Rust toolchain paths before the system paths. +pub(crate) fn prepend_to_env_var(var_name: &str, new_path: &std::path::Path) -> std::ffi::OsString { + let current_val = std::env::var_os(var_name).unwrap_or_default(); + prepend_to_env_var_impl(current_val, new_path) +} + +pub(crate) struct ProcessOutput { + pub status: std::process::ExitStatus, + pub stderr_lines: Vec, +} + +/// Spawns a child process, drains its stderr in a background thread, and processes +/// its stdout line-by-line in the main thread while showing a progress spinner. +pub(crate) fn run_command_with_progress( + mut cmd: std::process::Command, + pb: Option, + mut process_stdout_line: F, +) -> anyhow::Result +where + F: FnMut(&str, Option<&indicatif::ProgressBar>) -> anyhow::Result<()>, +{ + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + + let mut child = cmd.spawn().context("Failed to spawn child process")?; + + let stderr_buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let stderr_buffer_clone = std::sync::Arc::clone(&stderr_buffer); + + let mut stderr_thread = None; + if let Some(stderr) = child.stderr.take() { + stderr_thread = Some(std::thread::spawn(move || { + let reader = std::io::BufReader::new(stderr); + for line in reader.lines().map_while(Result::ok) { + stderr_buffer_clone.lock().unwrap().push(line); + } + })); + } + + if let Some(ref p) = pb { + p.enable_steady_tick(std::time::Duration::from_millis(100)); + } + + if let Some(stdout) = child.stdout.take() { + let reader = std::io::BufReader::new(stdout); + for line in reader.lines().map_while(Result::ok) { + process_stdout_line(&line, pb.as_ref())?; + if let Some(ref p) = pb { + p.tick(); + } + } + } + + if let Some(ref p) = pb { + p.finish_and_clear(); + } + + let status = child.wait().context("Failed to wait for child process")?; + + if let Some(thread) = stderr_thread { + let _ = thread.join(); + } + + let stderr_lines = std::sync::Arc::try_unwrap(stderr_buffer).unwrap().into_inner().unwrap(); + + Ok(ProcessOutput { status, stderr_lines }) +} + +#[cfg(feature = "exocrate_tests")] +pub(crate) fn run_test_lock_helper( + role: &str, + lock_dir: &std::path::Path, + log_file: &std::path::Path, + sig_file: &std::path::Path, +) -> anyhow::Result<()> { + use std::io::Write as _; + + let append_log = |msg: &str| -> anyhow::Result<()> { + let mut file = std::fs::OpenOptions::new().create(true).append(true).open(log_file)?; + writeln!(file, "{}", msg)?; + Ok(()) + }; + + let wait_for_sig = || -> anyhow::Result<()> { + let start = std::time::Instant::now(); + while !sig_file.exists() { + if start.elapsed() > std::time::Duration::from_secs(3) { + anyhow::bail!("Timeout waiting for signal file {:?}", sig_file); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } + Ok(()) + }; + + match role { + "reader-a" => { + let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?; + append_log("SHARED_START_A")?; + wait_for_sig()?; + append_log("SHARED_END_A")?; + } + "reader-b" => { + let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?; + append_log("SHARED_START_B")?; + std::fs::write(sig_file, "")?; + append_log("SHARED_END_B")?; + } + "writer-a" => { + let _lock = DirLock::lock_exclusive(lock_dir.to_path_buf())?; + append_log("EXCLUSIVE_START_A")?; + wait_for_sig()?; + append_log("EXCLUSIVE_END_A")?; + } + "reader-exclusion" => { + std::fs::write(sig_file, "")?; + let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?; + append_log("SHARED_START_B")?; + append_log("SHARED_END_B")?; + } + _ => anyhow::bail!("Unknown test-lock-helper role: {}", role), + } + + Ok(()) +} + +#[cfg(test)] +#[macro_export] +macro_rules! workspace_fixture { + ($dir:expr, { $($path:expr => $content:expr),* $(,)? }) => {{ + let root = $dir.path(); + $( + let file_path = root.join($path); + if let Some(parent) = file_path.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + std::fs::write(&file_path, $content).unwrap(); + )* + }}; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prepend_to_env_var() { + // Test when current value is empty. + let path1 = std::path::Path::new("/path/one"); + let res1 = prepend_to_env_var_impl(std::ffi::OsString::new(), path1); + assert_eq!(res1, "/path/one"); + + // Test when current value is not empty. + let path2 = std::path::Path::new("/path/two"); + let res2 = prepend_to_env_var_impl(res1, path2); + assert_eq!(res2, "/path/two:/path/one"); + } + + #[test] + fn test_dir_lock_exclusive_mutual_exclusion() { + let temp_dir = tempfile::tempdir().unwrap(); + let lock_path = temp_dir.path().to_path_buf(); + + let barrier = std::sync::Arc::new(std::sync::Barrier::new(2)); + let barrier_clone = std::sync::Arc::clone(&barrier); + let lock_path_clone = lock_path.clone(); + + let lock_released = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let lock_released_clone = std::sync::Arc::clone(&lock_released); + + // Thread A acquires the lock. + let thread_a = std::thread::spawn(move || { + let _lock = DirLock::lock_exclusive(lock_path_clone).expect("Failed to lock exclusive"); + barrier_clone.wait(); // Signal Thread B that A holds the lock. + + // Simulate brief work holding the lock. + std::thread::sleep(std::time::Duration::from_millis(100)); + lock_released_clone.store(true, std::sync::atomic::Ordering::Relaxed); + // _lock drops here, releasing the lock. + }); + + // Thread B waits for Thread A to acquire the lock, then tries to acquire it itself. + let thread_b = std::thread::spawn(move || { + barrier.wait(); // Wait for Thread A to acquire lock. + + // Attempt to acquire lock. This should block until Thread A releases it. + let _lock = DirLock::lock_exclusive(lock_path).expect("Failed to lock exclusive in B"); + + // Assert that B only successfully locked the directory AFTER A released it. + assert!( + lock_released.load(std::sync::atomic::Ordering::Relaxed), + "Thread B acquired lock before Thread A released it!" + ); + }); + + thread_a.join().unwrap(); + thread_b.join().unwrap(); + } + + #[test] + fn test_dir_lock_shared_coexistence() { + let temp_dir = tempfile::tempdir().unwrap(); + let lock_path = temp_dir.path().to_path_buf(); + + // Thread A acquires shared lock. + let lock_a = DirLock::lock_shared(lock_path.clone()).expect("Failed to lock shared"); + + // Thread B should be able to acquire shared lock immediately without blocking. + let lock_b = DirLock::lock_shared(lock_path).expect("Failed to lock shared concurrently"); + + // Both locks are held. + drop(lock_a); + drop(lock_b); + } +}