Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 139 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
//!
//! ```toml
//! [dependencies]
//! xtask-watch = "0.1.0"
//! xtask-watch = "0.3"
//! ```
//!
//! # Examples
Expand Down Expand Up @@ -162,12 +162,12 @@ use anyhow::{Context, Result};
use clap::Parser;
use glob::Pattern;
use lazy_static::lazy_static;
use notify::{Event, EventHandler, RecursiveMode, Watcher};
use notify::Watcher as _;
use std::{
env, io,
path::{Path, PathBuf},
process::{Child, Command, ExitStatus},
sync::{Arc, Mutex, mpsc},
sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, mpsc},
thread,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -200,6 +200,9 @@ pub fn xtask_command() -> Command {

/// Watches over your project's source code, relaunching a given command when
/// changes are detected.
///
/// Use [`Watch::lock`] to obtain a [`WatchLock`] that can be shared with external
/// code (e.g. an HTTP server) to coordinate reads with ongoing rebuilds.
#[non_exhaustive]
#[derive(Clone, Debug, Default, Parser)]
#[clap(about = "Watches over your project's source code.")]
Expand Down Expand Up @@ -235,6 +238,8 @@ pub struct Watch {
exclude_globs: Vec<Pattern>,
#[clap(skip)]
workspace_exclude_globs: Vec<Pattern>,
#[clap(skip)]
watch_lock: WatchLock,
}

impl Watch {
Expand Down Expand Up @@ -287,6 +292,24 @@ impl Watch {
self
}

/// Return the shared lock used by this watcher.
///
/// Clone and share this lock with external code (e.g. HTTP handlers) to coordinate with
/// watch-driven command execution.
///
/// # Lock lifecycle
///
/// [`run`](Self::run) acquires the **write lock immediately** when it is called — before the
/// first command is even spawned. Any code that calls [`WatchLock::acquire`] will therefore
/// block until the first build completes. This is intentional: it prevents readers from
/// observing an empty or incomplete dist directory before the initial build has finished.
/// The write lock is then re-acquired on every subsequent rebuild and released once the
/// command sequence succeeds.
#[must_use = "store and share the lock with readers that must coordinate with rebuilds"]
pub fn lock(&self) -> WatchLock {
Comment thread
cecton marked this conversation as resolved.
self.watch_lock.clone()
}

/// Set the debounce duration after relaunching the command.
pub fn debounce(mut self, duration: Duration) -> Self {
self.debounce = duration;
Expand All @@ -302,7 +325,10 @@ impl Watch {
let list = commands.into();

{
let mut commands = list.commands.lock().expect("not poisoned");
let mut commands = list
.commands
.lock()
.expect("no panic-prone code runs while this lock is held");

commands.extend(self.shell_commands.iter().map(|x| {
let mut command =
Expand Down Expand Up @@ -343,42 +369,50 @@ impl Watch {

let handler = WatchEventHandler {
watch: self.clone(),
tx,
tx: tx.clone(),
command_start: Instant::now(),
};

let mut watcher =
notify::recommended_watcher(handler).context("could not initialize watcher")?;

for path in &self.watch_paths {
match watcher.watch(path, RecursiveMode::Recursive) {
match watcher.watch(path, notify::RecursiveMode::Recursive) {
Ok(()) => log::trace!("Watching {}", path.display()),
Err(err) => log::error!("cannot watch {}: {err}", path.display()),
}
}

let mut current_child = SharedChild::new();
let mut lock_guard = Some(self.watch_lock.write());
let mut generation: u64 = 0;
loop {
Comment thread
cecton marked this conversation as resolved.
{
log::info!("Re-running command");
if lock_guard.is_some() {
log::info!("Running command");
let mut current_child = current_child.clone();
let mut list = list.clone();
let tx = tx.clone();
let build_id = generation;
thread::spawn(move || {
let mut status = ExitStatus::default();

list.spawn(|res| match res {
Err(err) => {
log::error!("Could not execute command: {err}");
false
}
Ok(child) => {
log::trace!("new child: {}", child.id());
log::trace!("Child spawned PID: {}", child.id());
current_child.replace(child);
status = current_child.wait();
status.success()
}
});

if status.success() {
log::info!("Command succeeded.");
tx.send(Event::CommandSucceeded(build_id))
.expect("can send");
} else if let Some(code) = status.code() {
log::error!("Command failed (exit code: {code})");
} else {
Expand All @@ -387,13 +421,27 @@ impl Watch {
});
}

let res = rx.recv();
if res.is_ok() {
log::trace!("Changes detected, re-generating");
}
current_child.terminate();
if res.is_err() {
break;
match rx.recv() {
Ok(Event::ChangeDetected) => {
log::trace!("Changes detected, re-generating");
if lock_guard.is_none() {
lock_guard = Some(self.watch_lock.write());
}
generation += 1;
current_child.terminate();
}
Ok(Event::CommandSucceeded(build_id)) if build_id == generation => {
lock_guard.take();
}
Ok(Event::CommandSucceeded(build_id)) => {
log::trace!(
"Ignoring stale success from build {build_id} (current: {generation})"
);
}
Err(_) => {
current_child.terminate();
break;
}
Comment thread
cecton marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -514,15 +562,15 @@ impl Watch {

struct WatchEventHandler {
watch: Watch,
tx: mpsc::Sender<()>,
tx: mpsc::Sender<Event>,
command_start: Instant,
}

impl EventHandler for WatchEventHandler {
fn handle_event(&mut self, event: Result<Event, notify::Error>) {
impl notify::EventHandler for WatchEventHandler {
fn handle_event(&mut self, event: Result<notify::Event, notify::Error>) {
match event {
Ok(event) => {
if (event.kind.is_modify() || event.kind.is_create() || event.kind.is_create())
if (event.kind.is_modify() || event.kind.is_create())
&& event.paths.iter().any(|x| {
!self.watch.is_excluded_path(x)
&& x.exists()
Expand All @@ -534,7 +582,7 @@ impl EventHandler for WatchEventHandler {
log::trace!("Changes detected in {event:?}");
self.command_start = Instant::now();

self.tx.send(()).expect("can send");
self.tx.send(Event::ChangeDetected).expect("can send");
} else {
log::trace!("Ignoring changes in {event:?}");
}
Expand All @@ -557,12 +605,18 @@ impl SharedChild {
}

fn replace(&mut self, child: impl Into<Option<Child>>) {
*self.child.lock().expect("not poisoned") = child.into();
*self
.child
.lock()
.expect("no panic-prone code runs while this lock is held") = child.into();
}

fn wait(&mut self) -> ExitStatus {
loop {
let mut child = self.child.lock().expect("not poisoned");
let mut child = self
.child
.lock()
.expect("no panic-prone code runs while this lock is held");
match child.as_mut().map(|child| child.try_wait()) {
Some(Ok(Some(status))) => {
break status;
Expand All @@ -583,7 +637,12 @@ impl SharedChild {
}

fn terminate(&mut self) {
if let Some(child) = self.child.lock().expect("not poisoned").as_mut() {
if let Some(child) = self
.child
.lock()
.expect("no panic-prone code runs while this lock is held")
.as_mut()
{
#[cfg(unix)]
{
let killing_start = Instant::now();
Expand Down Expand Up @@ -648,14 +707,22 @@ impl<const SIZE: usize> From<[Command; SIZE]> for CommandList {
impl CommandList {
/// Returns `true` if the list is empty.
pub fn is_empty(&self) -> bool {
self.commands.lock().expect("not poisoned").is_empty()
self.commands
.lock()
.expect("no panic-prone code runs while this lock is held")
.is_empty()
}

/// Spawn each command of the list one after the other.
///
/// The caller is responsible to wait the commands.
pub fn spawn(&mut self, mut callback: impl FnMut(io::Result<Child>) -> bool) {
for process in self.commands.lock().expect("not poisoned").iter_mut() {
for process in self
.commands
.lock()
.expect("no panic-prone code runs while this lock is held")
.iter_mut()
{
if !callback(process.spawn()) {
break;
}
Expand All @@ -665,7 +732,12 @@ impl CommandList {
/// Run all the commands sequentially using [`std::process::Command::status`] and stop at the
/// first failure.
pub fn status(&mut self) -> io::Result<ExitStatus> {
for process in self.commands.lock().expect("not poisoned").iter_mut() {
for process in self
.commands
.lock()
.expect("no panic-prone code runs while this lock is held")
.iter_mut()
{
let exit_status = process.status()?;
if !exit_status.success() {
return Ok(exit_status);
Expand All @@ -675,6 +747,46 @@ impl CommandList {
}
}

/// Guard returned by [`WatchLock::acquire`].
///
/// Keep this value alive for the duration of the protected read section.
/// The lock is released automatically when the guard is dropped.
pub struct WatchLockGuard<'a> {
_guard: RwLockReadGuard<'a, ()>,
}

/// A lock handle used to coordinate file reads with watch-driven rebuilds.
///
/// Obtain it from [`Watch::lock`], clone it, and call [`WatchLock::acquire`] while
/// reading files that must not race with rebuild writes.
#[derive(Clone, Debug, Default)]
pub struct WatchLock(Arc<RwLock<()>>);

impl WatchLock {
/// Acquire shared access to the protected section.
///
/// Multiple readers may hold this guard concurrently.
pub fn acquire(&self) -> WatchLockGuard<'_> {
WatchLockGuard {
// The inner value is `()` — there is no data to corrupt, so we can
// always recover from a poisoned lock.
_guard: self.0.read().unwrap_or_else(|e| e.into_inner()),
}
}

fn write(&self) -> RwLockWriteGuard<'_, ()> {
// The inner value is `()` — there is no data to corrupt, so we can
// always recover from a poisoned lock.
self.0.write().unwrap_or_else(|e| e.into_inner())
}
}

#[derive(Debug)]
enum Event {
CommandSucceeded(u64),
ChangeDetected,
}
Comment thread
cecton marked this conversation as resolved.

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading