From c4a5f3b1a3b1a96b5818e1011cfc59ac26bbb7c3 Mon Sep 17 00:00:00 2001 From: Dylan Wolff Date: Tue, 9 Sep 2025 00:28:18 +0000 Subject: [PATCH 1/8] Adding TimeModels to Shuttle --- shuttle/Cargo.toml | 3 +- shuttle/src/runtime/execution.rs | 45 +- shuttle/src/runtime/runner.rs | 24 +- shuttle/src/runtime/thread/continuation.rs | 2 + shuttle/src/sync/mod.rs | 1 + shuttle/src/sync/time/constant_stepped.rs | 146 ++++ shuttle/src/sync/time/frozen.rs | 141 ++++ shuttle/src/sync/time/mod.rs | 825 +++++++++++++++++++++ shuttle/src/thread.rs | 10 +- shuttle/tests/basic/mod.rs | 1 + shuttle/tests/basic/pct.rs | 36 +- shuttle/tests/basic/time.rs | 357 +++++++++ 12 files changed, 1558 insertions(+), 33 deletions(-) create mode 100644 shuttle/src/sync/time/constant_stepped.rs create mode 100644 shuttle/src/sync/time/frozen.rs create mode 100644 shuttle/src/sync/time/mod.rs create mode 100644 shuttle/tests/basic/time.rs diff --git a/shuttle/Cargo.toml b/shuttle/Cargo.toml index 23cc9f91..074d9913 100644 --- a/shuttle/Cargo.toml +++ b/shuttle/Cargo.toml @@ -15,6 +15,7 @@ bitvec = "1.0.1" cfg-if = "1.0" hex = "0.4.2" owo-colors = "3.5.0" +pin-project = "1.1.3" rand_core = "0.6.4" rand = "0.8.6" rand_pcg = "0.3.1" @@ -39,7 +40,6 @@ tempfile = "3.2.0" test-log = { version = "0.2.8", default-features = false, features = ["trace"] } tracing-subscriber = { version = "0.3.9", features = ["env-filter"] } trybuild = "1.0" -pin-project = "1.1.3" # The following line is necessary to ensure vector-clocks are used for integration tests # To run performance tests without vector clocks using `cargo bench`, this line must be commented out shuttle = { path = ".", features = ["vector-clocks"] } @@ -56,6 +56,7 @@ annotation = ["dep:serde", "dep:serde_json", "dep:regex"] # are otherwise always enabled via a dev-dependency to ensure all *test* assertions utilizing vector # clocks behave correctly during testing bench-no-vector-clocks = [] +advanced-time-models = [] [[bench]] name = "lock" diff --git a/shuttle/src/runtime/execution.rs b/shuttle/src/runtime/execution.rs index 1b063445..287192e2 100644 --- a/shuttle/src/runtime/execution.rs +++ b/shuttle/src/runtime/execution.rs @@ -6,6 +6,7 @@ use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, use crate::runtime::thread; use crate::runtime::thread::continuation::PooledContinuation; use crate::scheduler::{Schedule, Scheduler}; +use crate::sync::time::{get_time_model, TimeModel}; use crate::sync::{ResourceSignature, ResourceType}; use crate::thread::thread_fn; use crate::{backtrace_enabled, Config, MaxSteps, UNGRACEFUL_SHUTDOWN_CONFIG}; @@ -91,15 +92,21 @@ thread_local! { /// static variable, but clients get access to it by calling `ExecutionState::with`. pub(crate) struct Execution { scheduler: Rc>, + time_model: Rc>, initial_schedule: Schedule, } impl Execution { /// Construct a new execution that will use the given scheduler. The execution should then be /// invoked via its `run` method, which takes as input the closure for task 0. - pub(crate) fn new(scheduler: Rc>, initial_schedule: Schedule) -> Self { + pub(crate) fn new( + scheduler: Rc>, + initial_schedule: Schedule, + time_model: Rc>, + ) -> Self { Self { scheduler, + time_model, initial_schedule, } } @@ -130,6 +137,15 @@ impl StepError { } } +/// While there are no runnable tasks and tasks are able to be woken by the time model, continually wakes tasks. +/// The TimeModel's `wake_next` method is called in a loop in case there are stale wakers which do not actually +/// result in a newly runnable task when woken. Requires access to the ExecutionState to obtain a reference to the +/// time model. +fn wake_sleepers_until_runnable() { + let tm = get_time_model(); + while ExecutionState::num_runnable() == 0 && tm.borrow_mut().wake_next() {} +} + impl Execution { /// Run a function to be tested, taking control of scheduling it and any tasks it might spawn. /// This function runs until `f` and all tasks spawned by `f` have terminated, or until the @@ -138,7 +154,11 @@ impl Execution { where F: FnOnce() + Send + 'static, { - let state = RefCell::new(ExecutionState::new(config.clone(), Rc::clone(&self.scheduler))); + let state = RefCell::new(ExecutionState::new( + config.clone(), + Rc::clone(&self.scheduler), + Rc::clone(&self.time_model), + )); init_panic_hook(config.clone()); CurrentSchedule::init(self.initial_schedule.clone()); @@ -245,6 +265,7 @@ impl Execution { #[inline] fn run_to_competion(&mut self, immediately_return_on_panic: bool) -> Result<(), StepError> { loop { + wake_sleepers_until_runnable(); let next_step: Option>> = ExecutionState::with(|state| { state.schedule()?; state.advance_to_next_task(); @@ -340,6 +361,10 @@ pub(crate) struct ExecutionState { // Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation // on each scheduling decision. Should not be used outside of the `schedule` function runnable_tasks: Vec<*const Task>, + + // Counter for unique timing resource ids (Sleeps, Timeouts and Intervals) + pub(crate) timer_id_counter: u64, + pub(crate) time_model: Rc>, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -373,7 +398,7 @@ pub enum ExecutionStateBorrowError { } impl ExecutionState { - fn new(config: Config, scheduler: Rc>) -> Self { + fn new(config: Config, scheduler: Rc>, time_model: Rc>) -> Self { Self { config, tasks: SmallVec::new(), @@ -389,6 +414,8 @@ impl ExecutionState { has_cleaned_up: false, top_level_span: tracing::Span::current(), runnable_tasks: Vec::with_capacity(DEFAULT_INLINE_TASKS), + time_model, + timer_id_counter: 0, } } @@ -665,6 +692,8 @@ impl ExecutionState { TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().clear()); LABELS.with(|cell| cell.borrow_mut().clear()); + Self::with(|s| s.timer_id_counter = 0); + #[cfg(debug_assertions)] Self::with(|state| state.has_cleaned_up = true); @@ -680,6 +709,8 @@ impl ExecutionState { /// is different from the currently running task, indicating that the current task should yield /// its execution. pub(crate) fn maybe_yield() -> bool { + wake_sleepers_until_runnable(); + Self::with(|state| { if std::thread::panicking() && !state.in_cleanup { return true; @@ -786,11 +817,19 @@ impl ExecutionState { Self::with(|state| state.context_switches) } + pub(crate) fn num_tasks(&self) -> usize { + self.tasks.len() + } + #[track_caller] pub(crate) fn new_resource_signature(resource_type: ResourceType) -> ResourceSignature { ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type)) } + pub(crate) fn num_runnable() -> usize { + Self::with(|state| state.tasks.iter().filter(|t| t.runnable()).count()) + } + pub(crate) fn get_storage, T: 'static>(&self, key: K) -> Option<&T> { self.storage .get(key.into()) diff --git a/shuttle/src/runtime/runner.rs b/shuttle/src/runtime/runner.rs index 11ea0b42..974f4163 100644 --- a/shuttle/src/runtime/runner.rs +++ b/shuttle/src/runtime/runner.rs @@ -3,6 +3,7 @@ use crate::runtime::task::{Task, TaskId}; use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL}; use crate::scheduler::metrics::MetricsScheduler; use crate::scheduler::{Schedule, Scheduler}; +use crate::sync::time::{frozen::FrozenTimeModel, TimeModel}; use crate::Config; use std::cell::RefCell; use std::fmt; @@ -52,18 +53,33 @@ impl Drop for ResetSpanOnDrop { /// function as many times as dictated by the scheduler; each execution has its scheduling decisions /// resolved by the scheduler, which can make different choices for each execution. #[derive(Debug)] -pub struct Runner { +pub struct Runner { scheduler: Rc>>, + time_model: Rc>, config: Config, } -impl Runner { +impl Runner { /// Construct a new `Runner` that will use the given `Scheduler` to control the test. pub fn new(scheduler: S, config: Config) -> Self { let metrics_scheduler = MetricsScheduler::new(scheduler); Self { scheduler: Rc::new(RefCell::new(metrics_scheduler)), + time_model: Rc::new(RefCell::new(FrozenTimeModel::new())), + config, + } + } +} + +impl Runner { + /// Construct a new `Runner` that will use the given `Scheduler` to control the test. + pub fn new_with_time_model(scheduler: S, time_model: T, config: Config) -> Self { + let metrics_scheduler = MetricsScheduler::new(scheduler); + + Self { + scheduler: Rc::new(RefCell::new(metrics_scheduler)), + time_model: Rc::new(RefCell::new(time_model)), config, } } @@ -95,8 +111,9 @@ impl Runner { None => break, Some(s) => s, }; + self.time_model.borrow_mut().new_execution(); - let execution = Execution::new(self.scheduler.clone(), schedule); + let execution = Execution::new(self.scheduler.clone(), schedule, self.time_model.clone()); let f = Arc::clone(&f); // This is a slightly lazy way to ensure that everything outside of the "execution" span gets @@ -122,7 +139,6 @@ pub struct PortfolioRunner { stop_on_first_failure: bool, config: Config, } - impl PortfolioRunner { /// Construct a new `PortfolioRunner` with no schedulers. If `stop_on_first_failure` is true, /// all schedulers will be terminated as soon as any fails; if false, they will keep running diff --git a/shuttle/src/runtime/thread/continuation.rs b/shuttle/src/runtime/thread/continuation.rs index 5c6f2bdc..5dd03186 100644 --- a/shuttle/src/runtime/thread/continuation.rs +++ b/shuttle/src/runtime/thread/continuation.rs @@ -1,4 +1,5 @@ use crate::runtime::execution::ExecutionState; +use crate::sync::time::get_time_model; use crate::{ContinuationFunctionBehavior, UNGRACEFUL_SHUTDOWN_CONFIG}; use corosensei::Yielder; use corosensei::{stack::DefaultStack, Coroutine, CoroutineResult}; @@ -342,6 +343,7 @@ pub(crate) fn switch() { ContinuationInput::Resume => {} }; } + get_time_model().borrow_mut().step() } #[cfg(test)] diff --git a/shuttle/src/sync/mod.rs b/shuttle/src/sync/mod.rs index 5874c91d..85ecbe59 100644 --- a/shuttle/src/sync/mod.rs +++ b/shuttle/src/sync/mod.rs @@ -7,6 +7,7 @@ pub mod mpsc; mod mutex; mod once; mod rwlock; +pub mod time; pub use barrier::{Barrier, BarrierWaitResult}; pub use condvar::{Condvar, WaitTimeoutResult}; diff --git a/shuttle/src/sync/time/constant_stepped.rs b/shuttle/src/sync/time/constant_stepped.rs new file mode 100644 index 00000000..45d095ab --- /dev/null +++ b/shuttle/src/sync/time/constant_stepped.rs @@ -0,0 +1,146 @@ +use std::{ + cmp::{max, Reverse}, + collections::{BinaryHeap, HashMap}, + task::Waker, +}; + +use tracing::{trace, warn}; + +use crate::{current::TaskId, runtime::execution::ExecutionState}; + +use super::{Duration, Instant, TimeDistribution, TimeModel}; + +/// A time model where time advances by a constant amount for each scheduling step +#[derive(Clone, Debug)] +pub struct ConstantSteppedTimeModel { + distribution: ConstantTimeDistribution, + current_step_size: std::time::Duration, + current_time_elapsed: std::time::Duration, + waiters: BinaryHeap>, + wakers: HashMap, +} + +unsafe impl Send for ConstantSteppedTimeModel {} + +impl ConstantSteppedTimeModel { + /// Create a ConstantSteppedTimeModel + pub fn new(step_size: std::time::Duration) -> Self { + let distribution = ConstantTimeDistribution::new(step_size); + Self { + distribution, + current_step_size: distribution.sample(), + current_time_elapsed: std::time::Duration::from_secs(0), + waiters: BinaryHeap::new(), + wakers: HashMap::new(), + } + } + + fn unblock_expired(&mut self) { + while let Some(waker_key) = self.waiters.peek().and_then(|Reverse((t, _, sleep_id))| { + if *t <= self.current_time_elapsed { + Some(*sleep_id) + } else { + None + } + }) { + _ = self.waiters.pop(); + if let Some(waker) = self.wakers.remove(&waker_key) { + waker.wake(); + } + } + } + + /// Get the currently sleeping tasks and deadlines. May contain duplicates + pub fn get_waiters(&self) -> &[Reverse<(std::time::Duration, TaskId, u64)>] { + self.waiters.as_slice() + } + + /// Manually wake a task without affecting the global clock + pub fn wake_frozen(&mut self, sleep_id: u64) { + if let Some(waker) = self.wakers.remove(&sleep_id) { + waker.wake(); + } + } +} + +impl TimeModel for ConstantSteppedTimeModel { + fn pause(&mut self) { + warn!("Pausing stepped model has no effect") + } + + fn resume(&mut self) { + warn!("Resuming stepped model has no effect") + } + + fn step(&mut self) { + self.current_time_elapsed += self.current_step_size; + trace!("time step to {:?}", self.current_time_elapsed); + self.unblock_expired(); + } + + fn new_execution(&mut self) { + self.current_step_size = self.distribution.sample(); + self.current_time_elapsed = std::time::Duration::from_secs(0); + self.waiters.clear(); + self.wakers.clear(); + } + + fn instant(&self) -> Instant { + Instant::Simulated(self.current_time_elapsed) + } + + fn wake_next(&mut self) -> bool { + if self.waiters.is_empty() { + return false; + } + if let Some(Reverse((time, _, _))) = self.waiters.peek() { + self.current_time_elapsed = max(self.current_time_elapsed, *time); + } + self.unblock_expired(); + true + } + + #[allow(clippy::useless_conversion)] + fn advance(&mut self, dur: Duration) { + self.current_time_elapsed += dur.into(); + } + + fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option) -> bool { + let deadline = deadline.unwrap_simulated(); + if deadline <= self.current_time_elapsed { + return true; + } + + if let Some(waker) = waker { + let task_id = ExecutionState::with(|s| s.current().id()); + let item = (deadline, task_id, sleep_id); + self.waiters.push(Reverse(item)); + self.wakers.insert(sleep_id, waker); + } + false + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} + +/// A constant distribution; each sample returns the same time +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub struct ConstantTimeDistribution { + /// The time that will be returned on sampling + pub time: std::time::Duration, +} + +impl ConstantTimeDistribution { + /// Create a new constant time distribution + pub fn new(time: std::time::Duration) -> Self { + Self { time } + } +} + +impl TimeDistribution for ConstantTimeDistribution { + fn sample(&self) -> std::time::Duration { + self.time + } +} diff --git a/shuttle/src/sync/time/frozen.rs b/shuttle/src/sync/time/frozen.rs new file mode 100644 index 00000000..dfdc59e0 --- /dev/null +++ b/shuttle/src/sync/time/frozen.rs @@ -0,0 +1,141 @@ +use std::{cmp::Reverse, collections::HashSet, task::Waker}; + +use tracing::warn; + +use crate::{ + current::{with_labels_for_task, Labels, TaskId}, + runtime::execution::ExecutionState, +}; + +use super::{constant_stepped::ConstantSteppedTimeModel, Duration, Instant, TimeModel}; + +/// A time model where time does not advance unless forced +pub struct FrozenTimeModel { + inner: ConstantSteppedTimeModel, + expired: HashSet, + #[allow(clippy::type_complexity)] + triggers: Vec bool>>, +} + +impl std::fmt::Debug for FrozenTimeModel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FrozenTimeModel") + .field("inner", &self.inner) + .field("expired", &self.expired) + .field("triggers", &format!("[{} triggers]", self.triggers.len())) + .finish() + } +} + +impl FrozenTimeModel { + /// Create a new Frozen time model + pub fn new() -> Self { + Self::default() + } + + /// Expire all timeouts on tasks that satisfy a predicate + pub fn trigger_timeouts(&mut self, trigger: F) + where + F: Fn(&Labels) -> bool + 'static, + { + let num_tasks = ExecutionState::with(|s| s.num_tasks()); + for i in 0..num_tasks { + let task_id = TaskId::from(i); + with_labels_for_task(task_id, |labels| { + if trigger(labels) { + self.expired.insert(task_id); + } + }); + } + + let mut to_wake = Vec::new(); + for Reverse((_, task_id, sleep_id)) in self.inner.get_waiters() { + if self.expired.contains(task_id) { + to_wake.push(*sleep_id); + } + } + + for sleep_id in to_wake { + self.inner.wake_frozen(sleep_id); + } + + self.triggers.push(Box::new(trigger)); + } + + /// Clear all triggers that expire timeouts + pub fn clear_triggers(&mut self) { + self.expired.clear(); + self.triggers.clear(); + } +} + +impl Default for FrozenTimeModel { + fn default() -> Self { + Self { + inner: ConstantSteppedTimeModel::new(std::time::Duration::ZERO), + expired: HashSet::new(), + triggers: Vec::new(), + } + } +} + +impl Clone for FrozenTimeModel { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + expired: self.expired.clone(), + triggers: Vec::new(), // Don't clone triggers + } + } +} + +impl TimeModel for FrozenTimeModel { + fn pause(&mut self) { + warn!("Pausing frozen model has no effect") + } + + fn resume(&mut self) { + warn!("Resuming frozen model has no effect") + } + + fn step(&mut self) {} + + fn new_execution(&mut self) { + self.inner.new_execution(); + self.expired.clear(); + self.triggers.clear(); + } + + fn instant(&self) -> Instant { + self.inner.instant() + } + + fn wake_next(&mut self) -> bool { + self.inner.wake_next() + } + + fn advance(&mut self, dur: Duration) { + self.inner.advance(dur); + } + + fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option) -> bool { + let task_id = ExecutionState::me(); + for trigger in &self.triggers { + with_labels_for_task(task_id, |labels| { + if trigger(labels) { + self.expired.insert(task_id); + } + }); + } + + if !self.expired.contains(&task_id) { + self.inner.register_sleep(deadline, sleep_id, waker) + } else { + true + } + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} diff --git a/shuttle/src/sync/time/mod.rs b/shuttle/src/sync/time/mod.rs new file mode 100644 index 00000000..048db6d9 --- /dev/null +++ b/shuttle/src/sync/time/mod.rs @@ -0,0 +1,825 @@ +//! Time +//! +//! Timing primitives allow Shuttle tests to interact with wall-clock time (Instant, Duration, Timeout, etc.) in a deterministic manner + +use std::cmp::Ordering; +use std::future::Future; +#[cfg(feature = "advanced-time-models")] +use std::ops::Mul; +use std::ops::{Add, AddAssign, Sub, SubAssign}; + +use std::{cell::RefCell, rc::Rc}; + +use std::pin::Pin; + +use pin_project::pin_project; +use std::task::{Context, Poll, Waker}; +use tracing::warn; + +use crate::current::Labels; +use crate::runtime::execution::ExecutionState; + +use crate::runtime::thread; +use crate::sync::time::frozen::FrozenTimeModel; + +/// Constant stepped time model implementation +pub mod constant_stepped; +/// Frozen time model implementation +pub mod frozen; + +/// Returns the current count of created timeout/sleep futures +pub fn timer_count() -> u64 { + ExecutionState::with(|state| state.timer_id_counter) +} + +fn increment_timer_counter() -> u64 { + ExecutionState::with(|state| { + state.timer_id_counter += 1; + state.timer_id_counter + }) +} + +/// A distribution of times which can be sampled +pub trait TimeDistribution { + /// Sample a duration from the given distribution + fn sample(&self) -> D; +} + +/// The trait implemented by each TimeModel +pub trait TimeModel: std::fmt::Debug { + /// Wake the next sleeping task; returns true if there exists a task that was able to be woken. + /// Called when all tasks are blocked to resolve timing based deadlocks (all unblocked tasks are sleeping). + fn wake_next(&mut self) -> bool; + /// Reset the TimeModel state for the next Shuttle iteration + fn new_execution(&mut self); + /// Callback after each scheduling step to allow the TimeModel to update itself + fn step(&mut self); + /// Used to create the TimeModel's Instant struct in functions like Instant::now() + fn instant(&self) -> Instant; + /// Pauses the TimeModel + fn pause(&mut self); + /// Resumes the TimeModel + fn resume(&mut self); + /// Manually advances the TimeModel's clock by a fixed amount + fn advance(&mut self, duration: Duration); + /// Callback for registering a sleep/timeout on the current task. It is up to the TimeModel + /// implementation to determine when to wake the sleeping task. If no waker is provided, then + /// the caller is polling whether it is currently expired but is not yet performing a blocking + /// sleep. + fn register_sleep(&mut self, deadline: Instant, id: u64, waker: Option) -> bool; + /// Downcast to Any for type casting / checking + fn as_any_mut(&mut self) -> &mut dyn std::any::Any; +} + +/// Provides a reference to the current TimeModel for this execution. +/// Uses `Rc::clone` so that ExecutionState isn't already borrowed when running most TimeModel methods +pub fn get_time_model() -> Rc> { + ExecutionState::with(|s| Rc::clone(&s.time_model)) +} + +/// Expire all current timeouts/sleeps requested by tasks whose tags match the +/// given predicate. May not be implemented by all TimeModels. +pub fn trigger_timeouts(trigger: F) +where + F: Fn(&Labels) -> bool + 'static, +{ + match get_time_model() + .borrow_mut() + .as_any_mut() + .downcast_mut::() + { + Some(model) => model.trigger_timeouts(trigger), + None => warn!("trigger_timeouts is only available for the default FrozenTimeModel"), + } +} + +/// Remove all triggers to expire timeouts +pub fn clear_triggers() { + match get_time_model() + .borrow_mut() + .as_any_mut() + .downcast_mut::() + { + Some(model) => model.clear_triggers(), + None => warn!("trigger_timeouts is only available for the default FrozenTimeModel"), + } +} + +#[cfg(feature = "advanced-time-models")] +mod advanced_duration { + use super::*; + + /// A Shuttle duration + #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] + pub enum Duration { + /// A concrete duration value + Std(std::time::Duration), + } + + impl Default for Duration { + fn default() -> Self { + Duration::Std(std::time::Duration::ZERO) + } + } + + impl Duration { + /// The maximum duration. + pub const MAX: Duration = Duration::Std(std::time::Duration::MAX); + /// Zero duration. + pub const ZERO: Duration = Duration::Std(std::time::Duration::ZERO); + + /// Creates a new Duration from the specified number of seconds. + pub fn from_secs(secs: u64) -> Self { + Duration::Std(std::time::Duration::from_secs(secs)) + } + + /// Creates a new Duration from the specified number of milliseconds. + pub fn from_millis(millis: u64) -> Self { + Duration::Std(std::time::Duration::from_millis(millis)) + } + + /// Creates a new Duration from the specified number of microseconds. + pub fn from_micros(micros: u64) -> Self { + Duration::Std(std::time::Duration::from_micros(micros)) + } + + /// Creates a new Duration from the specified number of nanoseconds. + pub fn from_nanos(nanos: u64) -> Self { + Duration::Std(std::time::Duration::from_nanos(nanos)) + } + + /// Returns the total number of nanoseconds contained by this Duration. + pub fn as_nanos(&self) -> u128 { + match self { + Duration::Std(d) => d.as_nanos(), + } + } + + /// Returns the total number of microseconds contained by this Duration. + pub fn as_micros(&self) -> u128 { + self.as_nanos() / 1000 + } + + /// Returns the total number of milliseconds contained by this Duration. + pub fn as_millis(&self) -> u128 { + self.as_micros() / 1000 + } + + /// Checked Duration addition. Computes self + other, returning None if overflow occurred. + pub fn checked_add(&self, other: Duration) -> Option { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => a.checked_add(b).map(Duration::Std), + } + } + + /// Checked Duration subtraction. Computes self - other, returning None if other is greater than self. + pub fn checked_sub(&self, other: Duration) -> Option { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => a.checked_sub(b).map(Duration::Std), + } + } + + /// Checked Duration multiplication. Computes self * other, returning None if overflow occurred. + pub fn checked_mul(&self, b: u32) -> Option { + match self { + Duration::Std(a) => a.checked_mul(b).map(Duration::Std), + } + } + + /// Checked Duration division. Computes self / other, returning None if other is zero or overflow occurred. + pub fn checked_div(&self, rhs: u32) -> Option { + match self { + Duration::Std(a) => a.checked_div(rhs).map(Duration::Std), + } + } + + /// Saturating Duration addition. Computes self + other, returning Duration::MAX if overflow occurred. + pub fn saturating_add(&self, other: Duration) -> Self { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => Duration::Std(a.saturating_add(b)), + } + } + + /// Saturating Duration subtraction. Computes self - other, returning Duration::ZERO if other is greater than self. + pub fn saturating_sub(&self, other: Duration) -> Self { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => Duration::Std(a.saturating_sub(b)), + } + } + + /// Saturating Duration multiplication. Computes self * other, returning Duration::MAX if overflow occurred. + pub fn saturating_mul(&self, rhs: u32) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.saturating_mul(rhs)), + } + } + + /// Multiplies Duration by f32. + pub fn mul_f32(&self, rhs: f32) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.mul_f32(rhs)), + } + } + + /// Multiplies Duration by f64. + pub fn mul_f64(&self, rhs: f64) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.mul_f64(rhs)), + } + } + + /// Divides Duration by f32. + pub fn div_f32(&self, rhs: f32) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.div_f32(rhs)), + } + } + + /// Divides Duration by f64. + pub fn div_f64(&self, rhs: f64) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.div_f64(rhs)), + } + } + + /// Returns the number of seconds contained by this Duration as f64. + pub fn as_secs_f64(&self) -> f64 { + match self { + Duration::Std(d) => d.as_secs_f64(), + } + } + + /// Returns the number of seconds contained by this Duration as f32. + pub fn as_secs_f32(&self) -> f32 { + match self { + Duration::Std(d) => d.as_secs_f32(), + } + } + + /// Returns the total number of whole seconds contained by this Duration. + pub fn as_secs(&self) -> u64 { + match self { + Duration::Std(d) => d.as_secs(), + } + } + + /// Returns the fractional part of this Duration, in whole milliseconds. + pub fn subsec_millis(&self) -> u32 { + match self { + Duration::Std(d) => d.subsec_millis(), + } + } + + /// Returns the fractional part of this Duration, in whole microseconds. + pub fn subsec_micros(&self) -> u32 { + match self { + Duration::Std(d) => d.subsec_micros(), + } + } + + /// Returns the fractional part of this Duration, in nanoseconds. + pub fn subsec_nanos(&self) -> u32 { + match self { + Duration::Std(d) => d.subsec_nanos(), + } + } + + /// Creates a new Duration from the specified number of whole seconds and additional nanoseconds. + pub fn new(secs: u64, nanos: u32) -> Self { + Duration::Std(std::time::Duration::new(secs, nanos)) + } + + /// Creates a new Duration from the specified number of seconds represented as f64. + pub fn from_secs_f64(secs: f64) -> Self { + Duration::Std(std::time::Duration::from_secs_f64(secs)) + } + + /// Creates a new Duration from the specified number of seconds represented as f32. + pub fn from_secs_f32(secs: f32) -> Self { + Duration::Std(std::time::Duration::from_secs_f32(secs)) + } + + pub(crate) fn unwrap_std(self) -> std::time::Duration { + match self { + Duration::Std(d) => d, + } + } + } + + impl Ord for Duration { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => a.cmp(b), + } + } + } + + impl PartialOrd for Duration { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Add for Duration { + type Output = Duration; + + fn add(self, other: Self) -> Self { + self.checked_add(other).unwrap() + } + } + + impl AddAssign for Duration { + fn add_assign(&mut self, other: Self) { + *self = self.checked_add(other).unwrap() + } + } + + impl SubAssign for Duration { + fn sub_assign(&mut self, other: Self) { + *self = self.checked_sub(other).unwrap() + } + } + + impl Mul for Duration { + type Output = Duration; + + fn mul(self, other: u32) -> Self { + self.checked_mul(other).unwrap() + } + } + + impl Mul for u32 { + type Output = Duration; + + fn mul(self, other: Duration) -> Duration { + other.checked_mul(self).unwrap() + } + } + + impl std::ops::Div for Duration { + type Output = Duration; + + fn div(self, rhs: u32) -> Duration { + self.checked_div(rhs).unwrap() + } + } + + impl std::ops::DivAssign for Duration { + fn div_assign(&mut self, rhs: u32) { + *self = self.checked_div(rhs).unwrap() + } + } + + impl From for Duration { + fn from(d: std::time::Duration) -> Self { + Duration::Std(d) + } + } + + impl From for std::time::Duration { + fn from(d: Duration) -> Self { + d.unwrap_std() + } + } + + impl std::ops::Sub for Duration { + type Output = Duration; + + fn sub(self, other: Self) -> Self { + self.checked_sub(other).unwrap() + } + } +} + +#[cfg(feature = "advanced-time-models")] +pub use advanced_duration::Duration; + +#[cfg(not(feature = "advanced-time-models"))] +pub use std::time::Duration; + +/// A Shuttle Instant +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Instant { + /// Deterministically simulated clock time represented by a Duration from the start of the test + Simulated(std::time::Duration), +} + +#[cfg(feature = "advanced-time-models")] +impl Instant { + /// Returns Some(t) where t is the time self - duration if t can be represented as Instant (which means it’s inside the bounds of the underlying data structure), None otherwise. + pub fn checked_sub(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => match duration { + Duration::Std(b) => a.checked_sub(b).map(Instant::Simulated), + }, + } + } + + /// Returns the amount of time elapsed from another instant to this one, or None if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can return None. + pub fn checked_duration_since(&self, earlier: Instant) -> Option { + match (self, earlier) { + (Instant::Simulated(a), Instant::Simulated(b)) => a.checked_sub(b).map(Duration::Std), + } + } + + /// Returns the amount of time elapsed from another instant to this one, or panics if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can panic. + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + { + self.checked_duration_since(earlier).unwrap_or(Duration::ZERO) + } + } + + /// Returns Some(t) where t is the time self + duration if t can be represented as Instant (which means it's inside the bounds + /// of the underlying data structure), None otherwise. + pub fn checked_add(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => match duration { + Duration::Std(b) => a.checked_add(b).map(Instant::Simulated), + }, + } + } +} + +#[cfg(not(feature = "advanced-time-models"))] +impl Instant { + /// Returns Some(t) where t is the time self - duration if t can be represented as Instant (which means it’s inside the bounds of the underlying data structure), None otherwise. + pub fn checked_sub(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => a.checked_sub(duration).map(Instant::Simulated), + } + } + + /// Returns the amount of time elapsed from another instant to this one, or None if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can return None. + pub fn checked_duration_since(&self, earlier: Instant) -> Option { + match (self, earlier) { + (Instant::Simulated(a), Instant::Simulated(b)) => a.checked_sub(b), + } + } + + /// Returns the amount of time elapsed from another instant to this one, or panics if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can panic. + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + { + self.checked_duration_since(earlier) + .unwrap_or(std::time::Duration::ZERO) + } + } + + /// Returns Some(t) where t is the time self + duration if t can be represented as Instant (which means it's inside the bounds + /// of the underlying data structure), None otherwise. + pub fn checked_add(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => a.checked_add(duration).map(Instant::Simulated), + } + } +} + +impl Instant { + /// Returns an instant corresponding to “now”. + pub fn now() -> Self { + get_time_model().borrow().instant() + } + + /// Cast this instant to a simulated time represented by a std::time::Duration + pub fn unwrap_simulated(self) -> std::time::Duration { + match self { + Instant::Simulated(d) => d, + } + } + + /// Returns the amount of time elapsed from another instant to this one, or panics if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can panic. + pub fn duration_since(&self, earlier: Instant) -> Duration { + self.checked_duration_since(earlier).unwrap() + } + + /// Returns the amount of time elapsed since this instant. + /// Previous Rust versions panicked when the current time was earlier than self. Currently this method returns a Duration of + /// zero in that case. Future versions may reintroduce the panic. + pub fn elapsed(&self) -> Duration { + Instant::now() + .checked_duration_since(*self) + .unwrap_or(Duration::from_secs(0)) + } + + /// Returns t where t is the time self + duration if t can be represented as Instant otherwise it saturates to the maximum time value + pub fn saturating_add(&self, duration: Duration) -> Self { + match self { + Instant::Simulated(_) => self.checked_add(duration).unwrap_or(Instant::Simulated(Duration::MAX)), + } + } +} + +impl Add for Instant { + type Output = Instant; + + fn add(self, other: Duration) -> Instant { + self.checked_add(other).unwrap() + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, other: Duration) { + *self = self.checked_add(other).unwrap() + } +} + +impl Sub for Instant { + type Output = Duration; + + fn sub(self, earlier: Instant) -> Duration { + self.checked_duration_since(earlier).unwrap() + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, other: Duration) { + *self = *self - other + } +} + +impl Sub for Instant { + type Output = Instant; + + fn sub(self, other: Duration) -> Instant { + self.checked_sub(other).unwrap() + } +} + +impl Ord for Instant { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Instant::Simulated(a), Instant::Simulated(b)) => a.cmp(b), + } + } +} + +impl PartialOrd for Instant { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Puts the current thread to sleep +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn sleep(dur: Duration) { + if dur == Duration::ZERO { + thread::switch(); + return; + } + crate::future::block_on(async_sleep(dur)); +} + +/// Advances the current global time without putting the current thread to sleep +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn advance(dur: Duration) { + get_time_model().borrow_mut().advance(dur); + thread::switch(); +} + +/// Returns a future which sleeps until the duration has elapsed +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_sleep(dur: Duration) -> Sleep { + let id = increment_timer_counter(); + Sleep { + id, + deadline: Instant::now().saturating_add(dur), + } +} + +/// Returns a future which sleeps until the deadline is reached +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_sleep_until(deadline: Instant) -> Sleep { + let id = increment_timer_counter(); + Sleep { id, deadline } +} + +/// Returns a struct which sleeps repeatedly at a fixed time intervals (a tokio::time::Interval) +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_interval(dur: Duration) -> Interval { + Interval { + start: None, + ticks: 0, + current_interval: None, + period: dur, + } +} + +/// Returns a struct which sleeps repeatedly at a fixed time intervals (a tokio::time::Interval) +/// This Interval starts at a fixed start time. +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_interval_at(start: Instant, period: Duration) -> Interval { + Interval { + start: Some(start), + ticks: 0, + current_interval: None, + period, + } +} + +/// A future which returns Poll::Pending until its deadline +#[pin_project] +#[derive(Debug)] +pub struct Sleep { + id: u64, + deadline: Instant, +} + +impl Future for Sleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let is_expired = get_time_model() + .borrow_mut() + .register_sleep(self.deadline, self.id, None); + if is_expired { + Poll::Ready(()) + } else { + let _ = get_time_model() + .borrow_mut() + .register_sleep(self.deadline, self.id, Some(cx.waker().clone())); + Poll::Pending + } + } +} + +impl Sleep { + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.deadline + } + + /// Returns `true` if `Sleep` has elapsed. + /// + /// A `Sleep` instance is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.deadline.checked_duration_since(Instant::now()).is_none() + } + + /// Resets the `Sleep` instance to a new deadline. + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + let me = self.project(); + *me.deadline = deadline; + } +} + +/// Timeout a future +#[pin_project] +#[derive(Debug)] +pub struct Interval { + start: Option, + ticks: u32, + period: Duration, + current_interval: Option>>, +} + +/// Missed tick behavior for Interval +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MissedTickBehavior { + /// Ticks as fast as possible until caught up. + Burst, + /// Tick at multiples of period from when tick was called, rather than from start. + Delay, + /// Skips missed ticks and tick on the next multiple of period from start. + Skip, +} + +impl Interval { + /// tick + pub async fn tick(&mut self) -> Instant { + let deadline = self.next_deadline(); + async_sleep_until(deadline).await; + self.ticks += 1; + deadline + } + + /// period for an Interval + pub fn period(&self) -> Duration { + self.period + } + + fn next_deadline(&mut self) -> Instant { + if let Some(start) = self.start { + let mut total_duration = Duration::ZERO; + total_duration += self.period * self.ticks; + start.checked_add(total_duration).unwrap() + } else { + let now = Instant::now(); + self.start = Some(now); + now + } + } + + /// poll tick + pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll { + let deadline = self.next_deadline(); + if self.current_interval.is_none() { + self.current_interval = Some(Box::pin(async_sleep_until(deadline))); + } + + match self.current_interval.as_mut().unwrap().as_mut().poll(cx) { + Poll::Ready(_) => { + self.current_interval = None; + Poll::Ready(deadline) + } + Poll::Pending => Poll::Pending, + } + } + + /// reset + pub fn reset(&mut self) { + self.start = None; + self.ticks = 0; + if let Some(x) = self.current_interval.as_mut() { + x.as_mut().reset(Instant::now()); + } + } + + /// Unimplemented + pub fn set_missed_tick_behavior(&mut self, _behavior: MissedTickBehavior) { + warn!("set missed tick behavior unimplemented: no effect!"); + } + + /// Unimplemented + pub fn missed_tick_behavior(&mut self) -> MissedTickBehavior { + warn!("set missed tick behavior unimplemented: no effect!"); + MissedTickBehavior::Burst + } +} + +/// Timeout a future +pub fn async_timeout(d: Duration, f: F) -> Timeout +where + F: Future, +{ + let id = increment_timer_counter(); + Timeout { + id, + deadline: Instant::now().saturating_add(d), + future: f, + } +} + +/// Timeout a future +#[pin_project] +#[derive(Debug)] +pub struct Timeout +where + F: Future, +{ + id: u64, + deadline: Instant, + #[pin] + future: F, +} + +/// Elapsed time error variant +#[derive(Debug, PartialEq, Eq)] +pub struct Elapsed(()); + +impl std::fmt::Display for Elapsed { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + "deadline has elapsed".fmt(fmt) + } +} + +impl std::error::Error for Elapsed {} + +impl From for std::io::Error { + fn from(_err: Elapsed) -> std::io::Error { + std::io::ErrorKind::TimedOut.into() + } +} + +impl Future for Timeout +where + F: Future, +{ + type Output = std::result::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let tm = get_time_model(); + let expired = tm.borrow_mut().register_sleep(*this.deadline, *this.id, None); + if expired { + return Poll::Ready(Err(Elapsed(()))); + } + + match this.future.poll(cx) { + Poll::Pending => { + let expired = tm + .borrow_mut() + .register_sleep(*this.deadline, *this.id, Some(cx.waker().clone())); + if expired { + return Poll::Ready(Err(Elapsed(()))); + } + Poll::Pending + } + Poll::Ready(x) => Poll::Ready(Ok(x)), + } + } +} diff --git a/shuttle/src/thread.rs b/shuttle/src/thread.rs index 5d945ac3..4db29ef7 100644 --- a/shuttle/src/thread.rs +++ b/shuttle/src/thread.rs @@ -3,11 +3,13 @@ use crate::runtime::execution::ExecutionState; use crate::runtime::task::TaskId; use crate::runtime::thread; +use crate::sync::time::Duration; +use std::fmt::Debug; use std::marker::PhantomData; use std::panic::Location; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::time::Duration; +pub use crate::sync::time::sleep; pub use std::thread::{panicking, Result}; /// A unique identifier for a running thread @@ -348,12 +350,6 @@ pub fn yield_now() { thread::switch(); } -/// Puts the current thread to sleep for at least the specified amount of time. -// Note that Shuttle does not model time, so this behaves just like a context switch. -pub fn sleep(_dur: Duration) { - thread::switch(); -} - /// Get a handle to the thread that invokes it pub fn current() -> Thread { let (task_id, name) = ExecutionState::with(|s| { diff --git a/shuttle/tests/basic/mod.rs b/shuttle/tests/basic/mod.rs index 88bd38d1..b193b898 100644 --- a/shuttle/tests/basic/mod.rs +++ b/shuttle/tests/basic/mod.rs @@ -20,6 +20,7 @@ mod shrink; mod tag; mod task; mod thread; +mod time; mod timeout; mod tracing; mod uncontrolled_nondeterminism; diff --git a/shuttle/tests/basic/pct.rs b/shuttle/tests/basic/pct.rs index 1ff4f6a3..3c5342e6 100644 --- a/shuttle/tests/basic/pct.rs +++ b/shuttle/tests/basic/pct.rs @@ -1,10 +1,10 @@ use shuttle::scheduler::PctScheduler; +use shuttle::sync::time::Duration; use shuttle::sync::Mutex; use shuttle::{check_pct, check_random, thread, Config, MaxSteps, Runner}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; use test_log::test; const TEST_LENGTH: usize = 20; @@ -18,7 +18,7 @@ fn figure5() { thread::spawn(move || { for _ in 0..TEST_LENGTH { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } *lock_clone.lock().unwrap() = 1; @@ -78,7 +78,7 @@ fn yield_spin_loop(use_yield: bool) { if use_yield { thread::yield_now(); } else { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } } }); @@ -110,21 +110,21 @@ fn figure1a_pct() { thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } *t1.lock().unwrap() = Some(1); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let _ = t2.lock().unwrap().expect("null dereference"); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); }); @@ -141,7 +141,7 @@ fn figure1b(num_threads: usize) { for _ in 0..num_threads - 2 { thread::spawn(|| { for _ in 0..5 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); } @@ -149,17 +149,17 @@ fn figure1b(num_threads: usize) { // Main worker threads take 10 steps each thread::spawn(move || { for _ in 0..5 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } *x1.lock().unwrap() = None; for _ in 0..4 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); thread::spawn(move || { for _ in 0..4 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let b = { let b = x2.lock().unwrap().is_some(); @@ -169,7 +169,7 @@ fn figure1b(num_threads: usize) { let _ = x2.lock().unwrap().expect("null dereference"); } for _ in 0..4 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); } @@ -216,30 +216,30 @@ fn figure_1c() { thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let a = a1.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let b = b1.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } assert_eq!(*a + *b, 0) }); thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let b = b2.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let a = a2.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } assert_eq!(*a + *b, 0); }); diff --git a/shuttle/tests/basic/time.rs b/shuttle/tests/basic/time.rs new file mode 100644 index 00000000..6828ebc0 --- /dev/null +++ b/shuttle/tests/basic/time.rs @@ -0,0 +1,357 @@ +use shuttle::current::{me, set_label_for_task}; +use shuttle::scheduler::{DfsScheduler, RandomScheduler}; +use shuttle::sync::time::constant_stepped::ConstantSteppedTimeModel; +use shuttle::sync::time::frozen::FrozenTimeModel; +use shuttle::sync::time::{ + async_interval, async_sleep, async_timeout, clear_triggers, trigger_timeouts, Duration, Instant, +}; +use shuttle::{future, thread, Config, Runner}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tracing::trace; + +#[test] +fn test_stepped_blocking_sleep() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + let start = Instant::now(); + thread::sleep(Duration::from_millis(50) + Duration::from_millis(50)); + let elapsed = start.elapsed(); + assert!(elapsed.as_millis() == 100); + }); +} + +/// This test includes 3 scheduling points which should advance the global time: +/// - 1 spawn and 1 yield on the main thread +/// - 1 yield on the child thread +/// +/// Thus with a stepped time, it should be possible for either side of the condition to be met: +/// - True => both threads have executed (3 events * time_step) +/// - False => only the main thread has executed (2 events * time_step) +#[test] +fn test_stepped_elapsed_time() { + let less_than_count = Arc::new(AtomicUsize::new(0)); + let greater_than_count = Arc::new(AtomicUsize::new(0)); + + let time_step = Duration::from_micros(10); + + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = DfsScheduler::new(None, false); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + let less_count_inner = less_than_count.clone(); + let greater_count_inner = greater_than_count.clone(); + + runner.run(move || { + let start = Instant::now(); + thread::spawn(|| { + thread::yield_now(); + }); + thread::yield_now(); + let elapsed = start.elapsed(); + trace!("elapsed {:?}", elapsed); + + if elapsed > 2 * time_step { + greater_count_inner.fetch_add(1, Ordering::SeqCst); + } else { + less_count_inner.fetch_add(1, Ordering::SeqCst); + } + }); + + let less_count = less_than_count.load(Ordering::SeqCst); + let greater_count = greater_than_count.load(Ordering::SeqCst); + + assert!( + less_count > 0, + "Expected some executions with elapsed <= {:?}, got {}", + time_step * 2, + less_count + ); + assert!( + greater_count > 0, + "Expected some executions with elapsed > {:?}, got {}", + time_step * 2, + greater_count + ); +} + +#[test] +fn test_stepped_async_sleep() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let start = Instant::now(); + async_sleep(Duration::from_millis(50)).await; + let elapsed = start.elapsed(); + assert_eq!(elapsed.as_millis(), 50); + }); + }); +} + +#[test] +fn test_stepped_timeout_expired() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let start = Instant::now(); + let result = async_timeout(Duration::from_millis(50), async { + async_sleep(Duration::from_millis(100)).await; + 42 + }) + .await; + assert!(result.is_err()); + assert_eq!(start.elapsed().as_millis(), 50); + }); + }); +} + +#[test] +fn test_stepped_timeout_not_expired() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let result = async_timeout(Duration::from_millis(50), async { + async_sleep(Duration::from_millis(20)).await; + 42 + }) + .await; + assert_eq!(result.unwrap(), 42); + }); + }); +} + +#[test] +fn test_async_interval() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let mut interval = async_interval(Duration::from_millis(10)); + let start = Instant::now(); + interval.tick().await; + let first_tick = start.elapsed(); + interval.tick().await; + let second_tick = start.elapsed(); + interval.tick().await; + let third_tick = start.elapsed(); + + assert_eq!(first_tick.as_millis(), 0); + assert_eq!(second_tick.as_millis(), 10); + assert_eq!(third_tick.as_millis(), 20); + }); + }); +} + +#[derive(Clone, Debug, PartialEq)] +struct TaskType(String); + +/// Probabilistically force a switch for random schedulers [P(no switch) = 1/T^bound for T threads] +/// Returns true if the condition ever holds while spinning. +fn spin_switch_and_get_any(condition: F) -> bool +where + F: Fn() -> bool, +{ + let bound = 10000; + for _ in 0..bound { + thread::yield_now(); + if condition() { + return true; + }; + } + false +} + +#[test] +fn test_stepped_sleep_woken_by_thread_steps() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_millis(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + let sleep_completed = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let sleep_completed_clone = sleep_completed.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + sleep_completed_clone.store(true, Ordering::SeqCst); + }); + + // Take many steps to advance time and wake the sleeping thread + assert!(spin_switch_and_get_any(|| sleep_completed.load(Ordering::SeqCst))); + }); +} + +#[test] +fn test_frozen_trigger_timeouts() { + let time_model = FrozenTimeModel::new(); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + let woken = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let woken_clone = woken.clone(); + + let handle = thread::spawn(move || { + set_label_for_task(me(), TaskType("target".to_string())); + + future::block_on(async { + async_sleep(Duration::from_millis(100)).await; + woken_clone.store(true, Ordering::SeqCst); + }); + }); + + // Verify the task has not completed yet because it is asleep + assert!(!spin_switch_and_get_any(|| woken.load(Ordering::SeqCst))); + + // Trigger timeouts for tasks with "target" label + trigger_timeouts(|labels| labels.get::().is_some_and(|t| t.0 == "target")); + + // Verify the task completed after trigger_timeouts + assert!(spin_switch_and_get_any(|| woken.load(Ordering::SeqCst))); + + // Wait for the task to complete + handle.join().unwrap(); + }); +} + +#[test] +fn test_frozen_trigger_timeouts_async_timeout() { + let time_model = FrozenTimeModel::new(); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + let timeout_triggered = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let timeout_triggered_clone = timeout_triggered.clone(); + + let handle = thread::spawn(move || { + set_label_for_task(me(), TaskType("target".to_string())); + + future::block_on(async { + let result = async_timeout(Duration::from_millis(100), async { + async_sleep(Duration::from_millis(200)).await; + 42 + }) + .await; + + timeout_triggered_clone.store(result.is_err(), Ordering::SeqCst); + }); + }); + + // Verify the timeout has not triggered yet + assert!(!spin_switch_and_get_any(|| timeout_triggered.load(Ordering::SeqCst))); + + // Trigger timeouts for tasks with "target" label + trigger_timeouts(|labels| labels.get::().is_some_and(|t| t.0 == "target")); + + // Verify the timeout was triggered + assert!(spin_switch_and_get_any(|| timeout_triggered.load(Ordering::SeqCst))); + + // Wait for the task to complete + handle.join().unwrap(); + }); +} + +#[test] +fn test_frozen_trigger_timeouts_selective() { + let time_model = FrozenTimeModel::new(); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + let other_woken = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + let other_woken_clone = other_woken.clone(); + + let other_handle = thread::spawn(move || { + set_label_for_task(me(), TaskType("other".to_string())); + + future::block_on(async { + async_sleep(Duration::from_millis(100)).await; + other_woken_clone.store(true, Ordering::SeqCst); + }); + }); + + // Verify the task hasn't completed yet + assert!(!spin_switch_and_get_any(|| other_woken.load(Ordering::SeqCst))); + + // Trigger timeouts only for "target" tasks, not "other" tasks + trigger_timeouts(|labels| labels.get::().is_some_and(|t| t.0 == "target")); + + // Verify the "other" task is still sleeping (not woken by selective trigger) + assert!(!spin_switch_and_get_any(|| other_woken.load(Ordering::SeqCst))); + + other_handle.join().unwrap(); + }); +} + +#[test] +fn test_frozen_trigger_timeouts_before_timeout_created() { + let time_model = FrozenTimeModel::new(); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + let timeout_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let timeout_expired_clone = timeout_expired.clone(); + + // Trigger timeouts before creating any timeout + trigger_timeouts(|_| true); + + let _handle = thread::spawn(move || { + future::block_on(async { + let start = Instant::now(); + let result = async_timeout(Duration::from_millis(100), async { + async_sleep(Duration::from_millis(200)).await; + 42 + }) + .await; + + // Timeout should expire immediately without advancing time + assert!(result.is_err()); + assert_eq!(start.elapsed(), Duration::ZERO); + timeout_expired_clone.store(true, Ordering::SeqCst); + }); + }); + + assert!(spin_switch_and_get_any(|| timeout_expired.load(Ordering::SeqCst))); + }); +} + +#[test] +fn test_frozen_clear_triggers() { + let time_model = FrozenTimeModel::new(); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + trigger_timeouts(|_| true); + clear_triggers(); + + let handle = thread::spawn(move || { + future::block_on(async { + let start = Instant::now(); + let result = async_timeout(Duration::from_millis(100), async { + async_sleep(Duration::from_millis(200)).await; + 42 + }) + .await; + + // Timeout should expire by advancing time when no other threads are runnable + assert!(result.is_err()); + assert_eq!(start.elapsed().as_millis(), 100); + }); + }); + + handle.join().unwrap(); + }); +} From 6443a0c62da1f0e030e7694080eff3400cda71d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Tue, 27 Jan 2026 17:04:55 -0800 Subject: [PATCH 2/8] Move TimeModels out of sync --- shuttle/src/lib.rs | 1 + shuttle/src/runtime/execution.rs | 2 +- shuttle/src/runtime/runner.rs | 2 +- shuttle/src/runtime/thread/continuation.rs | 2 +- shuttle/src/sync/mod.rs | 1 - shuttle/src/thread.rs | 4 ++-- shuttle/src/{sync => }/time/constant_stepped.rs | 0 shuttle/src/{sync => }/time/frozen.rs | 0 shuttle/src/{sync => }/time/mod.rs | 10 +++++++++- shuttle/tests/basic/pct.rs | 2 +- shuttle/tests/basic/time.rs | 8 +++----- 11 files changed, 19 insertions(+), 13 deletions(-) rename shuttle/src/{sync => }/time/constant_stepped.rs (100%) rename shuttle/src/{sync => }/time/frozen.rs (100%) rename shuttle/src/{sync => }/time/mod.rs (99%) diff --git a/shuttle/src/lib.rs b/shuttle/src/lib.rs index 75eb0857..f0743174 100644 --- a/shuttle/src/lib.rs +++ b/shuttle/src/lib.rs @@ -187,6 +187,7 @@ pub mod lazy_static; pub mod rand; pub mod sync; pub mod thread; +pub mod time; pub mod current; pub mod scheduler; diff --git a/shuttle/src/runtime/execution.rs b/shuttle/src/runtime/execution.rs index 287192e2..527afb09 100644 --- a/shuttle/src/runtime/execution.rs +++ b/shuttle/src/runtime/execution.rs @@ -6,9 +6,9 @@ use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, use crate::runtime::thread; use crate::runtime::thread::continuation::PooledContinuation; use crate::scheduler::{Schedule, Scheduler}; -use crate::sync::time::{get_time_model, TimeModel}; use crate::sync::{ResourceSignature, ResourceType}; use crate::thread::thread_fn; +use crate::time::{get_time_model, TimeModel}; use crate::{backtrace_enabled, Config, MaxSteps, UNGRACEFUL_SHUTDOWN_CONFIG}; use scoped_tls::scoped_thread_local; use smallvec::SmallVec; diff --git a/shuttle/src/runtime/runner.rs b/shuttle/src/runtime/runner.rs index 974f4163..c2a327fb 100644 --- a/shuttle/src/runtime/runner.rs +++ b/shuttle/src/runtime/runner.rs @@ -3,7 +3,7 @@ use crate::runtime::task::{Task, TaskId}; use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL}; use crate::scheduler::metrics::MetricsScheduler; use crate::scheduler::{Schedule, Scheduler}; -use crate::sync::time::{frozen::FrozenTimeModel, TimeModel}; +use crate::time::{frozen::FrozenTimeModel, TimeModel}; use crate::Config; use std::cell::RefCell; use std::fmt; diff --git a/shuttle/src/runtime/thread/continuation.rs b/shuttle/src/runtime/thread/continuation.rs index 5dd03186..84520eb2 100644 --- a/shuttle/src/runtime/thread/continuation.rs +++ b/shuttle/src/runtime/thread/continuation.rs @@ -1,5 +1,5 @@ use crate::runtime::execution::ExecutionState; -use crate::sync::time::get_time_model; +use crate::time::get_time_model; use crate::{ContinuationFunctionBehavior, UNGRACEFUL_SHUTDOWN_CONFIG}; use corosensei::Yielder; use corosensei::{stack::DefaultStack, Coroutine, CoroutineResult}; diff --git a/shuttle/src/sync/mod.rs b/shuttle/src/sync/mod.rs index 85ecbe59..5874c91d 100644 --- a/shuttle/src/sync/mod.rs +++ b/shuttle/src/sync/mod.rs @@ -7,7 +7,6 @@ pub mod mpsc; mod mutex; mod once; mod rwlock; -pub mod time; pub use barrier::{Barrier, BarrierWaitResult}; pub use condvar::{Condvar, WaitTimeoutResult}; diff --git a/shuttle/src/thread.rs b/shuttle/src/thread.rs index 4db29ef7..de7316e9 100644 --- a/shuttle/src/thread.rs +++ b/shuttle/src/thread.rs @@ -3,13 +3,13 @@ use crate::runtime::execution::ExecutionState; use crate::runtime::task::TaskId; use crate::runtime::thread; -use crate::sync::time::Duration; +use crate::time::Duration; use std::fmt::Debug; use std::marker::PhantomData; use std::panic::Location; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -pub use crate::sync::time::sleep; +pub use crate::time::sleep; pub use std::thread::{panicking, Result}; /// A unique identifier for a running thread diff --git a/shuttle/src/sync/time/constant_stepped.rs b/shuttle/src/time/constant_stepped.rs similarity index 100% rename from shuttle/src/sync/time/constant_stepped.rs rename to shuttle/src/time/constant_stepped.rs diff --git a/shuttle/src/sync/time/frozen.rs b/shuttle/src/time/frozen.rs similarity index 100% rename from shuttle/src/sync/time/frozen.rs rename to shuttle/src/time/frozen.rs diff --git a/shuttle/src/sync/time/mod.rs b/shuttle/src/time/mod.rs similarity index 99% rename from shuttle/src/sync/time/mod.rs rename to shuttle/src/time/mod.rs index 048db6d9..51accbed 100644 --- a/shuttle/src/sync/time/mod.rs +++ b/shuttle/src/time/mod.rs @@ -20,7 +20,7 @@ use crate::current::Labels; use crate::runtime::execution::ExecutionState; use crate::runtime::thread; -use crate::sync::time::frozen::FrozenTimeModel; +use crate::time::frozen::FrozenTimeModel; /// Constant stepped time model implementation pub mod constant_stepped; @@ -50,23 +50,31 @@ pub trait TimeModel: std::fmt::Debug { /// Wake the next sleeping task; returns true if there exists a task that was able to be woken. /// Called when all tasks are blocked to resolve timing based deadlocks (all unblocked tasks are sleeping). fn wake_next(&mut self) -> bool; + /// Reset the TimeModel state for the next Shuttle iteration fn new_execution(&mut self); + /// Callback after each scheduling step to allow the TimeModel to update itself fn step(&mut self); + /// Used to create the TimeModel's Instant struct in functions like Instant::now() fn instant(&self) -> Instant; + /// Pauses the TimeModel fn pause(&mut self); + /// Resumes the TimeModel fn resume(&mut self); + /// Manually advances the TimeModel's clock by a fixed amount fn advance(&mut self, duration: Duration); + /// Callback for registering a sleep/timeout on the current task. It is up to the TimeModel /// implementation to determine when to wake the sleeping task. If no waker is provided, then /// the caller is polling whether it is currently expired but is not yet performing a blocking /// sleep. fn register_sleep(&mut self, deadline: Instant, id: u64, waker: Option) -> bool; + /// Downcast to Any for type casting / checking fn as_any_mut(&mut self) -> &mut dyn std::any::Any; } diff --git a/shuttle/tests/basic/pct.rs b/shuttle/tests/basic/pct.rs index 3c5342e6..4defc795 100644 --- a/shuttle/tests/basic/pct.rs +++ b/shuttle/tests/basic/pct.rs @@ -1,6 +1,6 @@ use shuttle::scheduler::PctScheduler; -use shuttle::sync::time::Duration; use shuttle::sync::Mutex; +use shuttle::time::Duration; use shuttle::{check_pct, check_random, thread, Config, MaxSteps, Runner}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; diff --git a/shuttle/tests/basic/time.rs b/shuttle/tests/basic/time.rs index 6828ebc0..0e447f00 100644 --- a/shuttle/tests/basic/time.rs +++ b/shuttle/tests/basic/time.rs @@ -1,10 +1,8 @@ use shuttle::current::{me, set_label_for_task}; use shuttle::scheduler::{DfsScheduler, RandomScheduler}; -use shuttle::sync::time::constant_stepped::ConstantSteppedTimeModel; -use shuttle::sync::time::frozen::FrozenTimeModel; -use shuttle::sync::time::{ - async_interval, async_sleep, async_timeout, clear_triggers, trigger_timeouts, Duration, Instant, -}; +use shuttle::time::constant_stepped::ConstantSteppedTimeModel; +use shuttle::time::frozen::FrozenTimeModel; +use shuttle::time::{async_interval, async_sleep, async_timeout, clear_triggers, trigger_timeouts, Duration, Instant}; use shuttle::{future, thread, Config, Runner}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; From 2e36181cf811283fd8ce379cdc655ce08b204839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= <44001885+sarsko@users.noreply.github.com> Date: Thu, 29 Jan 2026 00:00:39 -0800 Subject: [PATCH 3/8] Make `Sleep` `must_use` --- shuttle/src/time/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/shuttle/src/time/mod.rs b/shuttle/src/time/mod.rs index 51accbed..8d4cb826 100644 --- a/shuttle/src/time/mod.rs +++ b/shuttle/src/time/mod.rs @@ -631,6 +631,7 @@ pub fn async_interval_at(start: Instant, period: Duration) -> Interval { /// A future which returns Poll::Pending until its deadline #[pin_project] #[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Sleep { id: u64, deadline: Instant, From adfab47bedb997f5566687c1da3ec333174340df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= <44001885+sarsko@users.noreply.github.com> Date: Thu, 29 Jan 2026 00:57:34 -0800 Subject: [PATCH 4/8] Refactor async_sleep --- shuttle/src/time/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/shuttle/src/time/mod.rs b/shuttle/src/time/mod.rs index 8d4cb826..4061f93b 100644 --- a/shuttle/src/time/mod.rs +++ b/shuttle/src/time/mod.rs @@ -591,11 +591,7 @@ pub fn advance(dur: Duration) { /// Returns a future which sleeps until the duration has elapsed /// Behavior of this function depends on the TimeModel provided to Shuttle pub fn async_sleep(dur: Duration) -> Sleep { - let id = increment_timer_counter(); - Sleep { - id, - deadline: Instant::now().saturating_add(dur), - } + async_sleep_until(Instant::now().saturating_add(dur)) } /// Returns a future which sleeps until the deadline is reached From ad7f396abea08cbfca2872070c70434c35875599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Thu, 5 Feb 2026 17:30:00 -0800 Subject: [PATCH 5/8] Remove trigger_timeouts related functionality --- shuttle/src/runtime/execution.rs | 4 -- shuttle/src/time/constant_stepped.rs | 4 -- shuttle/src/time/frozen.rs | 99 +++------------------------- shuttle/src/time/mod.rs | 33 ---------- 4 files changed, 8 insertions(+), 132 deletions(-) diff --git a/shuttle/src/runtime/execution.rs b/shuttle/src/runtime/execution.rs index 527afb09..976703c8 100644 --- a/shuttle/src/runtime/execution.rs +++ b/shuttle/src/runtime/execution.rs @@ -817,10 +817,6 @@ impl ExecutionState { Self::with(|state| state.context_switches) } - pub(crate) fn num_tasks(&self) -> usize { - self.tasks.len() - } - #[track_caller] pub(crate) fn new_resource_signature(resource_type: ResourceType) -> ResourceSignature { ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type)) diff --git a/shuttle/src/time/constant_stepped.rs b/shuttle/src/time/constant_stepped.rs index 45d095ab..21a8d187 100644 --- a/shuttle/src/time/constant_stepped.rs +++ b/shuttle/src/time/constant_stepped.rs @@ -119,10 +119,6 @@ impl TimeModel for ConstantSteppedTimeModel { } false } - - fn as_any_mut(&mut self) -> &mut dyn std::any::Any { - self - } } /// A constant distribution; each sample returns the same time diff --git a/shuttle/src/time/frozen.rs b/shuttle/src/time/frozen.rs index dfdc59e0..7a190dd5 100644 --- a/shuttle/src/time/frozen.rs +++ b/shuttle/src/time/frozen.rs @@ -1,30 +1,11 @@ -use std::{cmp::Reverse, collections::HashSet, task::Waker}; - -use tracing::warn; - -use crate::{ - current::{with_labels_for_task, Labels, TaskId}, - runtime::execution::ExecutionState, -}; +use std::task::Waker; use super::{constant_stepped::ConstantSteppedTimeModel, Duration, Instant, TimeModel}; /// A time model where time does not advance unless forced +#[derive(Debug, Clone)] pub struct FrozenTimeModel { inner: ConstantSteppedTimeModel, - expired: HashSet, - #[allow(clippy::type_complexity)] - triggers: Vec bool>>, -} - -impl std::fmt::Debug for FrozenTimeModel { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FrozenTimeModel") - .field("inner", &self.inner) - .field("expired", &self.expired) - .field("triggers", &format!("[{} triggers]", self.triggers.len())) - .finish() - } } impl FrozenTimeModel { @@ -32,78 +13,31 @@ impl FrozenTimeModel { pub fn new() -> Self { Self::default() } - - /// Expire all timeouts on tasks that satisfy a predicate - pub fn trigger_timeouts(&mut self, trigger: F) - where - F: Fn(&Labels) -> bool + 'static, - { - let num_tasks = ExecutionState::with(|s| s.num_tasks()); - for i in 0..num_tasks { - let task_id = TaskId::from(i); - with_labels_for_task(task_id, |labels| { - if trigger(labels) { - self.expired.insert(task_id); - } - }); - } - - let mut to_wake = Vec::new(); - for Reverse((_, task_id, sleep_id)) in self.inner.get_waiters() { - if self.expired.contains(task_id) { - to_wake.push(*sleep_id); - } - } - - for sleep_id in to_wake { - self.inner.wake_frozen(sleep_id); - } - - self.triggers.push(Box::new(trigger)); - } - - /// Clear all triggers that expire timeouts - pub fn clear_triggers(&mut self) { - self.expired.clear(); - self.triggers.clear(); - } } impl Default for FrozenTimeModel { fn default() -> Self { Self { inner: ConstantSteppedTimeModel::new(std::time::Duration::ZERO), - expired: HashSet::new(), - triggers: Vec::new(), - } - } -} - -impl Clone for FrozenTimeModel { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - expired: self.expired.clone(), - triggers: Vec::new(), // Don't clone triggers } } } impl TimeModel for FrozenTimeModel { fn pause(&mut self) { - warn!("Pausing frozen model has no effect") + self.inner.pause(); } fn resume(&mut self) { - warn!("Resuming frozen model has no effect") + self.inner.resume(); } - fn step(&mut self) {} + fn step(&mut self) { + self.inner.step(); + } fn new_execution(&mut self) { self.inner.new_execution(); - self.expired.clear(); - self.triggers.clear(); } fn instant(&self) -> Instant { @@ -119,23 +53,6 @@ impl TimeModel for FrozenTimeModel { } fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option) -> bool { - let task_id = ExecutionState::me(); - for trigger in &self.triggers { - with_labels_for_task(task_id, |labels| { - if trigger(labels) { - self.expired.insert(task_id); - } - }); - } - - if !self.expired.contains(&task_id) { - self.inner.register_sleep(deadline, sleep_id, waker) - } else { - true - } - } - - fn as_any_mut(&mut self) -> &mut dyn std::any::Any { - self + self.inner.register_sleep(deadline, sleep_id, waker) } } diff --git a/shuttle/src/time/mod.rs b/shuttle/src/time/mod.rs index 4061f93b..8ecc16b5 100644 --- a/shuttle/src/time/mod.rs +++ b/shuttle/src/time/mod.rs @@ -16,11 +16,9 @@ use pin_project::pin_project; use std::task::{Context, Poll, Waker}; use tracing::warn; -use crate::current::Labels; use crate::runtime::execution::ExecutionState; use crate::runtime::thread; -use crate::time::frozen::FrozenTimeModel; /// Constant stepped time model implementation pub mod constant_stepped; @@ -74,9 +72,6 @@ pub trait TimeModel: std::fmt::Debug { /// the caller is polling whether it is currently expired but is not yet performing a blocking /// sleep. fn register_sleep(&mut self, deadline: Instant, id: u64, waker: Option) -> bool; - - /// Downcast to Any for type casting / checking - fn as_any_mut(&mut self) -> &mut dyn std::any::Any; } /// Provides a reference to the current TimeModel for this execution. @@ -85,34 +80,6 @@ pub fn get_time_model() -> Rc> { ExecutionState::with(|s| Rc::clone(&s.time_model)) } -/// Expire all current timeouts/sleeps requested by tasks whose tags match the -/// given predicate. May not be implemented by all TimeModels. -pub fn trigger_timeouts(trigger: F) -where - F: Fn(&Labels) -> bool + 'static, -{ - match get_time_model() - .borrow_mut() - .as_any_mut() - .downcast_mut::() - { - Some(model) => model.trigger_timeouts(trigger), - None => warn!("trigger_timeouts is only available for the default FrozenTimeModel"), - } -} - -/// Remove all triggers to expire timeouts -pub fn clear_triggers() { - match get_time_model() - .borrow_mut() - .as_any_mut() - .downcast_mut::() - { - Some(model) => model.clear_triggers(), - None => warn!("trigger_timeouts is only available for the default FrozenTimeModel"), - } -} - #[cfg(feature = "advanced-time-models")] mod advanced_duration { use super::*; From 7e38147915b35ca4da29e5c5709ed31ed7bfcc58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Thu, 5 Feb 2026 18:38:36 -0800 Subject: [PATCH 6/8] Remove distribution sample in ConstantSteppedTimeModel --- shuttle/src/time/constant_stepped.rs | 33 ++++------------------------ 1 file changed, 4 insertions(+), 29 deletions(-) diff --git a/shuttle/src/time/constant_stepped.rs b/shuttle/src/time/constant_stepped.rs index 21a8d187..45f698e1 100644 --- a/shuttle/src/time/constant_stepped.rs +++ b/shuttle/src/time/constant_stepped.rs @@ -8,13 +8,12 @@ use tracing::{trace, warn}; use crate::{current::TaskId, runtime::execution::ExecutionState}; -use super::{Duration, Instant, TimeDistribution, TimeModel}; +use super::{Duration, Instant, TimeModel}; /// A time model where time advances by a constant amount for each scheduling step #[derive(Clone, Debug)] pub struct ConstantSteppedTimeModel { - distribution: ConstantTimeDistribution, - current_step_size: std::time::Duration, + step_size: std::time::Duration, current_time_elapsed: std::time::Duration, waiters: BinaryHeap>, wakers: HashMap, @@ -25,10 +24,8 @@ unsafe impl Send for ConstantSteppedTimeModel {} impl ConstantSteppedTimeModel { /// Create a ConstantSteppedTimeModel pub fn new(step_size: std::time::Duration) -> Self { - let distribution = ConstantTimeDistribution::new(step_size); Self { - distribution, - current_step_size: distribution.sample(), + step_size, current_time_elapsed: std::time::Duration::from_secs(0), waiters: BinaryHeap::new(), wakers: HashMap::new(), @@ -79,7 +76,6 @@ impl TimeModel for ConstantSteppedTimeModel { } fn new_execution(&mut self) { - self.current_step_size = self.distribution.sample(); self.current_time_elapsed = std::time::Duration::from_secs(0); self.waiters.clear(); self.wakers.clear(); @@ -100,9 +96,8 @@ impl TimeModel for ConstantSteppedTimeModel { true } - #[allow(clippy::useless_conversion)] fn advance(&mut self, dur: Duration) { - self.current_time_elapsed += dur.into(); + self.current_time_elapsed += dur; } fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option) -> bool { @@ -120,23 +115,3 @@ impl TimeModel for ConstantSteppedTimeModel { false } } - -/// A constant distribution; each sample returns the same time -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] -pub struct ConstantTimeDistribution { - /// The time that will be returned on sampling - pub time: std::time::Duration, -} - -impl ConstantTimeDistribution { - /// Create a new constant time distribution - pub fn new(time: std::time::Duration) -> Self { - Self { time } - } -} - -impl TimeDistribution for ConstantTimeDistribution { - fn sample(&self) -> std::time::Duration { - self.time - } -} From 1b901284a27b43243cdfde00ee72e5a25be1c908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Thu, 5 Feb 2026 19:34:23 -0800 Subject: [PATCH 7/8] Refactor register_sleep --- shuttle/src/runtime/execution.rs | 4 +-- shuttle/src/time/constant_stepped.rs | 48 +++++++++++----------------- shuttle/src/time/frozen.rs | 4 ++- shuttle/src/time/mod.rs | 45 ++++++++++++++------------ 4 files changed, 48 insertions(+), 53 deletions(-) diff --git a/shuttle/src/runtime/execution.rs b/shuttle/src/runtime/execution.rs index 976703c8..308b088a 100644 --- a/shuttle/src/runtime/execution.rs +++ b/shuttle/src/runtime/execution.rs @@ -142,8 +142,8 @@ impl StepError { /// result in a newly runnable task when woken. Requires access to the ExecutionState to obtain a reference to the /// time model. fn wake_sleepers_until_runnable() { - let tm = get_time_model(); - while ExecutionState::num_runnable() == 0 && tm.borrow_mut().wake_next() {} + let time_model = get_time_model(); + while ExecutionState::num_runnable() == 0 && time_model.borrow_mut().wake_next() {} } impl Execution { diff --git a/shuttle/src/time/constant_stepped.rs b/shuttle/src/time/constant_stepped.rs index 45f698e1..d406cb79 100644 --- a/shuttle/src/time/constant_stepped.rs +++ b/shuttle/src/time/constant_stepped.rs @@ -6,7 +6,7 @@ use std::{ use tracing::{trace, warn}; -use crate::{current::TaskId, runtime::execution::ExecutionState}; +use crate::{current::TaskId, runtime::execution::ExecutionState, time::WakerRegistered}; use super::{Duration, Instant, TimeModel}; @@ -32,7 +32,9 @@ impl ConstantSteppedTimeModel { } } - fn unblock_expired(&mut self) { + // Unblocks expired. Returns `true` if any were woken. + fn unblock_expired(&mut self) -> bool { + let mut out = false; while let Some(waker_key) = self.waiters.peek().and_then(|Reverse((t, _, sleep_id))| { if *t <= self.current_time_elapsed { Some(*sleep_id) @@ -44,19 +46,9 @@ impl ConstantSteppedTimeModel { if let Some(waker) = self.wakers.remove(&waker_key) { waker.wake(); } + out = true } - } - - /// Get the currently sleeping tasks and deadlines. May contain duplicates - pub fn get_waiters(&self) -> &[Reverse<(std::time::Duration, TaskId, u64)>] { - self.waiters.as_slice() - } - - /// Manually wake a task without affecting the global clock - pub fn wake_frozen(&mut self, sleep_id: u64) { - if let Some(waker) = self.wakers.remove(&sleep_id) { - waker.wake(); - } + out } } @@ -70,7 +62,7 @@ impl TimeModel for ConstantSteppedTimeModel { } fn step(&mut self) { - self.current_time_elapsed += self.current_step_size; + self.current_time_elapsed += self.step_size; trace!("time step to {:?}", self.current_time_elapsed); self.unblock_expired(); } @@ -86,32 +78,30 @@ impl TimeModel for ConstantSteppedTimeModel { } fn wake_next(&mut self) -> bool { - if self.waiters.is_empty() { - return false; - } if let Some(Reverse((time, _, _))) = self.waiters.peek() { self.current_time_elapsed = max(self.current_time_elapsed, *time); + self.unblock_expired(); + true + } else { + false } - self.unblock_expired(); - true } fn advance(&mut self, dur: Duration) { self.current_time_elapsed += dur; } - fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option) -> bool { + fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Waker) -> WakerRegistered { let deadline = deadline.unwrap_simulated(); if deadline <= self.current_time_elapsed { - return true; + return WakerRegistered::NotRegistered; } - if let Some(waker) = waker { - let task_id = ExecutionState::with(|s| s.current().id()); - let item = (deadline, task_id, sleep_id); - self.waiters.push(Reverse(item)); - self.wakers.insert(sleep_id, waker); - } - false + let task_id = ExecutionState::with(|s| s.current().id()); + let item = (deadline, task_id, sleep_id); + self.waiters.push(Reverse(item)); + self.wakers.insert(sleep_id, waker); + + WakerRegistered::Registered } } diff --git a/shuttle/src/time/frozen.rs b/shuttle/src/time/frozen.rs index 7a190dd5..8ab60424 100644 --- a/shuttle/src/time/frozen.rs +++ b/shuttle/src/time/frozen.rs @@ -1,5 +1,7 @@ use std::task::Waker; +use crate::time::WakerRegistered; + use super::{constant_stepped::ConstantSteppedTimeModel, Duration, Instant, TimeModel}; /// A time model where time does not advance unless forced @@ -52,7 +54,7 @@ impl TimeModel for FrozenTimeModel { self.inner.advance(dur); } - fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option) -> bool { + fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Waker) -> WakerRegistered { self.inner.register_sleep(deadline, sleep_id, waker) } } diff --git a/shuttle/src/time/mod.rs b/shuttle/src/time/mod.rs index 8ecc16b5..efbb6144 100644 --- a/shuttle/src/time/mod.rs +++ b/shuttle/src/time/mod.rs @@ -43,6 +43,15 @@ pub trait TimeDistribution { fn sample(&self) -> D; } +/// Whether or not a waker was registered on call to `register_sleep` +#[derive(Debug)] +pub enum WakerRegistered { + /// Waker was registered (ie. caller can return `Poll::Pending`) + Registered, + /// Waker was not registered (ie. the time duration has elapsed) + NotRegistered, +} + /// The trait implemented by each TimeModel pub trait TimeModel: std::fmt::Debug { /// Wake the next sleeping task; returns true if there exists a task that was able to be woken. @@ -68,10 +77,8 @@ pub trait TimeModel: std::fmt::Debug { fn advance(&mut self, duration: Duration); /// Callback for registering a sleep/timeout on the current task. It is up to the TimeModel - /// implementation to determine when to wake the sleeping task. If no waker is provided, then - /// the caller is polling whether it is currently expired but is not yet performing a blocking - /// sleep. - fn register_sleep(&mut self, deadline: Instant, id: u64, waker: Option) -> bool; + /// implementation to determine when to wake the sleeping task. + fn register_sleep(&mut self, deadline: Instant, id: u64, waker: Waker) -> WakerRegistered; } /// Provides a reference to the current TimeModel for this execution. @@ -604,16 +611,12 @@ impl Future for Sleep { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let is_expired = get_time_model() + match get_time_model() .borrow_mut() - .register_sleep(self.deadline, self.id, None); - if is_expired { - Poll::Ready(()) - } else { - let _ = get_time_model() - .borrow_mut() - .register_sleep(self.deadline, self.id, Some(cx.waker().clone())); - Poll::Pending + .register_sleep(self.deadline, self.id, cx.waker().clone()) + { + WakerRegistered::Registered => Poll::Pending, + WakerRegistered::NotRegistered => Poll::Ready(()), } } } @@ -773,23 +776,23 @@ where type Output = std::result::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let now = Instant::now(); let this = self.project(); - let tm = get_time_model(); - let expired = tm.borrow_mut().register_sleep(*this.deadline, *this.id, None); - if expired { + if *this.deadline <= now { return Poll::Ready(Err(Elapsed(()))); } + let time_model = get_time_model(); match this.future.poll(cx) { Poll::Pending => { - let expired = tm + match time_model .borrow_mut() - .register_sleep(*this.deadline, *this.id, Some(cx.waker().clone())); - if expired { - return Poll::Ready(Err(Elapsed(()))); + .register_sleep(*this.deadline, *this.id, cx.waker().clone()) + { + WakerRegistered::NotRegistered => Poll::Ready(Err(Elapsed(()))), + WakerRegistered::Registered => Poll::Pending, } - Poll::Pending } Poll::Ready(x) => Poll::Ready(Ok(x)), } From bf73929187f50b517d22640e14af8fdb29109897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Thu, 5 Feb 2026 19:37:48 -0800 Subject: [PATCH 8/8] Remove trigger timeouts tests --- shuttle/tests/basic/time.rs | 173 +----------------------------------- 1 file changed, 1 insertion(+), 172 deletions(-) diff --git a/shuttle/tests/basic/time.rs b/shuttle/tests/basic/time.rs index 0e447f00..86a338f2 100644 --- a/shuttle/tests/basic/time.rs +++ b/shuttle/tests/basic/time.rs @@ -1,8 +1,6 @@ -use shuttle::current::{me, set_label_for_task}; use shuttle::scheduler::{DfsScheduler, RandomScheduler}; use shuttle::time::constant_stepped::ConstantSteppedTimeModel; -use shuttle::time::frozen::FrozenTimeModel; -use shuttle::time::{async_interval, async_sleep, async_timeout, clear_triggers, trigger_timeouts, Duration, Instant}; +use shuttle::time::{async_interval, async_sleep, async_timeout, Duration, Instant}; use shuttle::{future, thread, Config, Runner}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -149,9 +147,6 @@ fn test_async_interval() { }); } -#[derive(Clone, Debug, PartialEq)] -struct TaskType(String); - /// Probabilistically force a switch for random schedulers [P(no switch) = 1/T^bound for T threads] /// Returns true if the condition ever holds while spinning. fn spin_switch_and_get_any(condition: F) -> bool @@ -187,169 +182,3 @@ fn test_stepped_sleep_woken_by_thread_steps() { assert!(spin_switch_and_get_any(|| sleep_completed.load(Ordering::SeqCst))); }); } - -#[test] -fn test_frozen_trigger_timeouts() { - let time_model = FrozenTimeModel::new(); - let scheduler = RandomScheduler::new(10); - let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); - - runner.run(|| { - let woken = Arc::new(std::sync::atomic::AtomicBool::new(false)); - let woken_clone = woken.clone(); - - let handle = thread::spawn(move || { - set_label_for_task(me(), TaskType("target".to_string())); - - future::block_on(async { - async_sleep(Duration::from_millis(100)).await; - woken_clone.store(true, Ordering::SeqCst); - }); - }); - - // Verify the task has not completed yet because it is asleep - assert!(!spin_switch_and_get_any(|| woken.load(Ordering::SeqCst))); - - // Trigger timeouts for tasks with "target" label - trigger_timeouts(|labels| labels.get::().is_some_and(|t| t.0 == "target")); - - // Verify the task completed after trigger_timeouts - assert!(spin_switch_and_get_any(|| woken.load(Ordering::SeqCst))); - - // Wait for the task to complete - handle.join().unwrap(); - }); -} - -#[test] -fn test_frozen_trigger_timeouts_async_timeout() { - let time_model = FrozenTimeModel::new(); - let scheduler = RandomScheduler::new(10); - let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); - - runner.run(|| { - let timeout_triggered = Arc::new(std::sync::atomic::AtomicBool::new(false)); - let timeout_triggered_clone = timeout_triggered.clone(); - - let handle = thread::spawn(move || { - set_label_for_task(me(), TaskType("target".to_string())); - - future::block_on(async { - let result = async_timeout(Duration::from_millis(100), async { - async_sleep(Duration::from_millis(200)).await; - 42 - }) - .await; - - timeout_triggered_clone.store(result.is_err(), Ordering::SeqCst); - }); - }); - - // Verify the timeout has not triggered yet - assert!(!spin_switch_and_get_any(|| timeout_triggered.load(Ordering::SeqCst))); - - // Trigger timeouts for tasks with "target" label - trigger_timeouts(|labels| labels.get::().is_some_and(|t| t.0 == "target")); - - // Verify the timeout was triggered - assert!(spin_switch_and_get_any(|| timeout_triggered.load(Ordering::SeqCst))); - - // Wait for the task to complete - handle.join().unwrap(); - }); -} - -#[test] -fn test_frozen_trigger_timeouts_selective() { - let time_model = FrozenTimeModel::new(); - let scheduler = RandomScheduler::new(10); - let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); - - runner.run(|| { - let other_woken = Arc::new(std::sync::atomic::AtomicBool::new(false)); - - let other_woken_clone = other_woken.clone(); - - let other_handle = thread::spawn(move || { - set_label_for_task(me(), TaskType("other".to_string())); - - future::block_on(async { - async_sleep(Duration::from_millis(100)).await; - other_woken_clone.store(true, Ordering::SeqCst); - }); - }); - - // Verify the task hasn't completed yet - assert!(!spin_switch_and_get_any(|| other_woken.load(Ordering::SeqCst))); - - // Trigger timeouts only for "target" tasks, not "other" tasks - trigger_timeouts(|labels| labels.get::().is_some_and(|t| t.0 == "target")); - - // Verify the "other" task is still sleeping (not woken by selective trigger) - assert!(!spin_switch_and_get_any(|| other_woken.load(Ordering::SeqCst))); - - other_handle.join().unwrap(); - }); -} - -#[test] -fn test_frozen_trigger_timeouts_before_timeout_created() { - let time_model = FrozenTimeModel::new(); - let scheduler = RandomScheduler::new(10); - let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); - - runner.run(|| { - let timeout_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); - let timeout_expired_clone = timeout_expired.clone(); - - // Trigger timeouts before creating any timeout - trigger_timeouts(|_| true); - - let _handle = thread::spawn(move || { - future::block_on(async { - let start = Instant::now(); - let result = async_timeout(Duration::from_millis(100), async { - async_sleep(Duration::from_millis(200)).await; - 42 - }) - .await; - - // Timeout should expire immediately without advancing time - assert!(result.is_err()); - assert_eq!(start.elapsed(), Duration::ZERO); - timeout_expired_clone.store(true, Ordering::SeqCst); - }); - }); - - assert!(spin_switch_and_get_any(|| timeout_expired.load(Ordering::SeqCst))); - }); -} - -#[test] -fn test_frozen_clear_triggers() { - let time_model = FrozenTimeModel::new(); - let scheduler = RandomScheduler::new(10); - let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); - - runner.run(|| { - trigger_timeouts(|_| true); - clear_triggers(); - - let handle = thread::spawn(move || { - future::block_on(async { - let start = Instant::now(); - let result = async_timeout(Duration::from_millis(100), async { - async_sleep(Duration::from_millis(200)).await; - 42 - }) - .await; - - // Timeout should expire by advancing time when no other threads are runnable - assert!(result.is_err()); - assert_eq!(start.elapsed().as_millis(), 100); - }); - }); - - handle.join().unwrap(); - }); -}