diff --git a/src/lib.rs b/src/lib.rs index 51b9a06..08c0f54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ //! //! ```toml //! [dependencies] -//! xtask-watch = "0.1.0" +//! xtask-watch = "0.3" //! ``` //! //! # Examples @@ -162,12 +162,12 @@ use anyhow::{Context, Result}; use clap::Parser; use glob::Pattern; use lazy_static::lazy_static; -use notify::{Event, EventHandler, RecursiveMode, Watcher}; +use notify::Watcher as _; use std::{ env, io, path::{Path, PathBuf}, process::{Child, Command, ExitStatus}, - sync::{Arc, Mutex, mpsc}, + sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, mpsc}, thread, time::{Duration, Instant}, }; @@ -200,6 +200,9 @@ pub fn xtask_command() -> Command { /// Watches over your project's source code, relaunching a given command when /// changes are detected. +/// +/// Use [`Watch::lock`] to obtain a [`WatchLock`] that can be shared with external +/// code (e.g. an HTTP server) to coordinate reads with ongoing rebuilds. #[non_exhaustive] #[derive(Clone, Debug, Default, Parser)] #[clap(about = "Watches over your project's source code.")] @@ -235,6 +238,8 @@ pub struct Watch { exclude_globs: Vec, #[clap(skip)] workspace_exclude_globs: Vec, + #[clap(skip)] + watch_lock: WatchLock, } impl Watch { @@ -287,6 +292,24 @@ impl Watch { self } + /// Return the shared lock used by this watcher. + /// + /// Clone and share this lock with external code (e.g. HTTP handlers) to coordinate with + /// watch-driven command execution. + /// + /// # Lock lifecycle + /// + /// [`run`](Self::run) acquires the **write lock immediately** when it is called — before the + /// first command is even spawned. Any code that calls [`WatchLock::acquire`] will therefore + /// block until the first build completes. This is intentional: it prevents readers from + /// observing an empty or incomplete dist directory before the initial build has finished. + /// The write lock is then re-acquired on every subsequent rebuild and released once the + /// command sequence succeeds. + #[must_use = "store and share the lock with readers that must coordinate with rebuilds"] + pub fn lock(&self) -> WatchLock { + self.watch_lock.clone() + } + /// Set the debounce duration after relaunching the command. pub fn debounce(mut self, duration: Duration) -> Self { self.debounce = duration; @@ -302,7 +325,10 @@ impl Watch { let list = commands.into(); { - let mut commands = list.commands.lock().expect("not poisoned"); + let mut commands = list + .commands + .lock() + .expect("no panic-prone code runs while this lock is held"); commands.extend(self.shell_commands.iter().map(|x| { let mut command = @@ -343,7 +369,7 @@ impl Watch { let handler = WatchEventHandler { watch: self.clone(), - tx, + tx: tx.clone(), command_start: Instant::now(), }; @@ -351,34 +377,42 @@ impl Watch { notify::recommended_watcher(handler).context("could not initialize watcher")?; for path in &self.watch_paths { - match watcher.watch(path, RecursiveMode::Recursive) { + match watcher.watch(path, notify::RecursiveMode::Recursive) { Ok(()) => log::trace!("Watching {}", path.display()), Err(err) => log::error!("cannot watch {}: {err}", path.display()), } } let mut current_child = SharedChild::new(); + let mut lock_guard = Some(self.watch_lock.write()); + let mut generation: u64 = 0; loop { - { - log::info!("Re-running command"); + if lock_guard.is_some() { + log::info!("Running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); + let tx = tx.clone(); + let build_id = generation; thread::spawn(move || { let mut status = ExitStatus::default(); + list.spawn(|res| match res { Err(err) => { log::error!("Could not execute command: {err}"); false } Ok(child) => { - log::trace!("new child: {}", child.id()); + log::trace!("Child spawned PID: {}", child.id()); current_child.replace(child); status = current_child.wait(); status.success() } }); + if status.success() { log::info!("Command succeeded."); + tx.send(Event::CommandSucceeded(build_id)) + .expect("can send"); } else if let Some(code) = status.code() { log::error!("Command failed (exit code: {code})"); } else { @@ -387,13 +421,27 @@ impl Watch { }); } - let res = rx.recv(); - if res.is_ok() { - log::trace!("Changes detected, re-generating"); - } - current_child.terminate(); - if res.is_err() { - break; + match rx.recv() { + Ok(Event::ChangeDetected) => { + log::trace!("Changes detected, re-generating"); + if lock_guard.is_none() { + lock_guard = Some(self.watch_lock.write()); + } + generation += 1; + current_child.terminate(); + } + Ok(Event::CommandSucceeded(build_id)) if build_id == generation => { + lock_guard.take(); + } + Ok(Event::CommandSucceeded(build_id)) => { + log::trace!( + "Ignoring stale success from build {build_id} (current: {generation})" + ); + } + Err(_) => { + current_child.terminate(); + break; + } } } @@ -514,15 +562,15 @@ impl Watch { struct WatchEventHandler { watch: Watch, - tx: mpsc::Sender<()>, + tx: mpsc::Sender, command_start: Instant, } -impl EventHandler for WatchEventHandler { - fn handle_event(&mut self, event: Result) { +impl notify::EventHandler for WatchEventHandler { + fn handle_event(&mut self, event: Result) { match event { Ok(event) => { - if (event.kind.is_modify() || event.kind.is_create() || event.kind.is_create()) + if (event.kind.is_modify() || event.kind.is_create()) && event.paths.iter().any(|x| { !self.watch.is_excluded_path(x) && x.exists() @@ -534,7 +582,7 @@ impl EventHandler for WatchEventHandler { log::trace!("Changes detected in {event:?}"); self.command_start = Instant::now(); - self.tx.send(()).expect("can send"); + self.tx.send(Event::ChangeDetected).expect("can send"); } else { log::trace!("Ignoring changes in {event:?}"); } @@ -557,12 +605,18 @@ impl SharedChild { } fn replace(&mut self, child: impl Into>) { - *self.child.lock().expect("not poisoned") = child.into(); + *self + .child + .lock() + .expect("no panic-prone code runs while this lock is held") = child.into(); } fn wait(&mut self) -> ExitStatus { loop { - let mut child = self.child.lock().expect("not poisoned"); + let mut child = self + .child + .lock() + .expect("no panic-prone code runs while this lock is held"); match child.as_mut().map(|child| child.try_wait()) { Some(Ok(Some(status))) => { break status; @@ -583,7 +637,12 @@ impl SharedChild { } fn terminate(&mut self) { - if let Some(child) = self.child.lock().expect("not poisoned").as_mut() { + if let Some(child) = self + .child + .lock() + .expect("no panic-prone code runs while this lock is held") + .as_mut() + { #[cfg(unix)] { let killing_start = Instant::now(); @@ -648,14 +707,22 @@ impl From<[Command; SIZE]> for CommandList { impl CommandList { /// Returns `true` if the list is empty. pub fn is_empty(&self) -> bool { - self.commands.lock().expect("not poisoned").is_empty() + self.commands + .lock() + .expect("no panic-prone code runs while this lock is held") + .is_empty() } /// Spawn each command of the list one after the other. /// /// The caller is responsible to wait the commands. pub fn spawn(&mut self, mut callback: impl FnMut(io::Result) -> bool) { - for process in self.commands.lock().expect("not poisoned").iter_mut() { + for process in self + .commands + .lock() + .expect("no panic-prone code runs while this lock is held") + .iter_mut() + { if !callback(process.spawn()) { break; } @@ -665,7 +732,12 @@ impl CommandList { /// Run all the commands sequentially using [`std::process::Command::status`] and stop at the /// first failure. pub fn status(&mut self) -> io::Result { - for process in self.commands.lock().expect("not poisoned").iter_mut() { + for process in self + .commands + .lock() + .expect("no panic-prone code runs while this lock is held") + .iter_mut() + { let exit_status = process.status()?; if !exit_status.success() { return Ok(exit_status); @@ -675,6 +747,46 @@ impl CommandList { } } +/// Guard returned by [`WatchLock::acquire`]. +/// +/// Keep this value alive for the duration of the protected read section. +/// The lock is released automatically when the guard is dropped. +pub struct WatchLockGuard<'a> { + _guard: RwLockReadGuard<'a, ()>, +} + +/// A lock handle used to coordinate file reads with watch-driven rebuilds. +/// +/// Obtain it from [`Watch::lock`], clone it, and call [`WatchLock::acquire`] while +/// reading files that must not race with rebuild writes. +#[derive(Clone, Debug, Default)] +pub struct WatchLock(Arc>); + +impl WatchLock { + /// Acquire shared access to the protected section. + /// + /// Multiple readers may hold this guard concurrently. + pub fn acquire(&self) -> WatchLockGuard<'_> { + WatchLockGuard { + // The inner value is `()` — there is no data to corrupt, so we can + // always recover from a poisoned lock. + _guard: self.0.read().unwrap_or_else(|e| e.into_inner()), + } + } + + fn write(&self) -> RwLockWriteGuard<'_, ()> { + // The inner value is `()` — there is no data to corrupt, so we can + // always recover from a poisoned lock. + self.0.write().unwrap_or_else(|e| e.into_inner()) + } +} + +#[derive(Debug)] +enum Event { + CommandSucceeded(u64), + ChangeDetected, +} + #[cfg(test)] mod test { use super::*;