diff --git a/shuttle/Cargo.toml b/shuttle/Cargo.toml index 23cc9f91..074d9913 100644 --- a/shuttle/Cargo.toml +++ b/shuttle/Cargo.toml @@ -15,6 +15,7 @@ bitvec = "1.0.1" cfg-if = "1.0" hex = "0.4.2" owo-colors = "3.5.0" +pin-project = "1.1.3" rand_core = "0.6.4" rand = "0.8.6" rand_pcg = "0.3.1" @@ -39,7 +40,6 @@ tempfile = "3.2.0" test-log = { version = "0.2.8", default-features = false, features = ["trace"] } tracing-subscriber = { version = "0.3.9", features = ["env-filter"] } trybuild = "1.0" -pin-project = "1.1.3" # The following line is necessary to ensure vector-clocks are used for integration tests # To run performance tests without vector clocks using `cargo bench`, this line must be commented out shuttle = { path = ".", features = ["vector-clocks"] } @@ -56,6 +56,7 @@ annotation = ["dep:serde", "dep:serde_json", "dep:regex"] # are otherwise always enabled via a dev-dependency to ensure all *test* assertions utilizing vector # clocks behave correctly during testing bench-no-vector-clocks = [] +advanced-time-models = [] [[bench]] name = "lock" diff --git a/shuttle/src/lib.rs b/shuttle/src/lib.rs index 75eb0857..f0743174 100644 --- a/shuttle/src/lib.rs +++ b/shuttle/src/lib.rs @@ -187,6 +187,7 @@ pub mod lazy_static; pub mod rand; pub mod sync; pub mod thread; +pub mod time; pub mod current; pub mod scheduler; diff --git a/shuttle/src/runtime/execution.rs b/shuttle/src/runtime/execution.rs index 1b063445..308b088a 100644 --- a/shuttle/src/runtime/execution.rs +++ b/shuttle/src/runtime/execution.rs @@ -8,6 +8,7 @@ use crate::runtime::thread::continuation::PooledContinuation; use crate::scheduler::{Schedule, Scheduler}; use crate::sync::{ResourceSignature, ResourceType}; use crate::thread::thread_fn; +use crate::time::{get_time_model, TimeModel}; use crate::{backtrace_enabled, Config, MaxSteps, UNGRACEFUL_SHUTDOWN_CONFIG}; use scoped_tls::scoped_thread_local; use smallvec::SmallVec; @@ -91,15 +92,21 @@ thread_local! { /// static variable, but clients get access to it by calling `ExecutionState::with`. pub(crate) struct Execution { scheduler: Rc>, + time_model: Rc>, initial_schedule: Schedule, } impl Execution { /// Construct a new execution that will use the given scheduler. The execution should then be /// invoked via its `run` method, which takes as input the closure for task 0. - pub(crate) fn new(scheduler: Rc>, initial_schedule: Schedule) -> Self { + pub(crate) fn new( + scheduler: Rc>, + initial_schedule: Schedule, + time_model: Rc>, + ) -> Self { Self { scheduler, + time_model, initial_schedule, } } @@ -130,6 +137,15 @@ impl StepError { } } +/// While there are no runnable tasks and tasks are able to be woken by the time model, continually wakes tasks. +/// The TimeModel's `wake_next` method is called in a loop in case there are stale wakers which do not actually +/// result in a newly runnable task when woken. Requires access to the ExecutionState to obtain a reference to the +/// time model. +fn wake_sleepers_until_runnable() { + let time_model = get_time_model(); + while ExecutionState::num_runnable() == 0 && time_model.borrow_mut().wake_next() {} +} + impl Execution { /// Run a function to be tested, taking control of scheduling it and any tasks it might spawn. /// This function runs until `f` and all tasks spawned by `f` have terminated, or until the @@ -138,7 +154,11 @@ impl Execution { where F: FnOnce() + Send + 'static, { - let state = RefCell::new(ExecutionState::new(config.clone(), Rc::clone(&self.scheduler))); + let state = RefCell::new(ExecutionState::new( + config.clone(), + Rc::clone(&self.scheduler), + Rc::clone(&self.time_model), + )); init_panic_hook(config.clone()); CurrentSchedule::init(self.initial_schedule.clone()); @@ -245,6 +265,7 @@ impl Execution { #[inline] fn run_to_competion(&mut self, immediately_return_on_panic: bool) -> Result<(), StepError> { loop { + wake_sleepers_until_runnable(); let next_step: Option>> = ExecutionState::with(|state| { state.schedule()?; state.advance_to_next_task(); @@ -340,6 +361,10 @@ pub(crate) struct ExecutionState { // Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation // on each scheduling decision. Should not be used outside of the `schedule` function runnable_tasks: Vec<*const Task>, + + // Counter for unique timing resource ids (Sleeps, Timeouts and Intervals) + pub(crate) timer_id_counter: u64, + pub(crate) time_model: Rc>, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -373,7 +398,7 @@ pub enum ExecutionStateBorrowError { } impl ExecutionState { - fn new(config: Config, scheduler: Rc>) -> Self { + fn new(config: Config, scheduler: Rc>, time_model: Rc>) -> Self { Self { config, tasks: SmallVec::new(), @@ -389,6 +414,8 @@ impl ExecutionState { has_cleaned_up: false, top_level_span: tracing::Span::current(), runnable_tasks: Vec::with_capacity(DEFAULT_INLINE_TASKS), + time_model, + timer_id_counter: 0, } } @@ -665,6 +692,8 @@ impl ExecutionState { TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().clear()); LABELS.with(|cell| cell.borrow_mut().clear()); + Self::with(|s| s.timer_id_counter = 0); + #[cfg(debug_assertions)] Self::with(|state| state.has_cleaned_up = true); @@ -680,6 +709,8 @@ impl ExecutionState { /// is different from the currently running task, indicating that the current task should yield /// its execution. pub(crate) fn maybe_yield() -> bool { + wake_sleepers_until_runnable(); + Self::with(|state| { if std::thread::panicking() && !state.in_cleanup { return true; @@ -791,6 +822,10 @@ impl ExecutionState { ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type)) } + pub(crate) fn num_runnable() -> usize { + Self::with(|state| state.tasks.iter().filter(|t| t.runnable()).count()) + } + pub(crate) fn get_storage, T: 'static>(&self, key: K) -> Option<&T> { self.storage .get(key.into()) diff --git a/shuttle/src/runtime/runner.rs b/shuttle/src/runtime/runner.rs index 11ea0b42..c2a327fb 100644 --- a/shuttle/src/runtime/runner.rs +++ b/shuttle/src/runtime/runner.rs @@ -3,6 +3,7 @@ use crate::runtime::task::{Task, TaskId}; use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL}; use crate::scheduler::metrics::MetricsScheduler; use crate::scheduler::{Schedule, Scheduler}; +use crate::time::{frozen::FrozenTimeModel, TimeModel}; use crate::Config; use std::cell::RefCell; use std::fmt; @@ -52,18 +53,33 @@ impl Drop for ResetSpanOnDrop { /// function as many times as dictated by the scheduler; each execution has its scheduling decisions /// resolved by the scheduler, which can make different choices for each execution. #[derive(Debug)] -pub struct Runner { +pub struct Runner { scheduler: Rc>>, + time_model: Rc>, config: Config, } -impl Runner { +impl Runner { /// Construct a new `Runner` that will use the given `Scheduler` to control the test. pub fn new(scheduler: S, config: Config) -> Self { let metrics_scheduler = MetricsScheduler::new(scheduler); Self { scheduler: Rc::new(RefCell::new(metrics_scheduler)), + time_model: Rc::new(RefCell::new(FrozenTimeModel::new())), + config, + } + } +} + +impl Runner { + /// Construct a new `Runner` that will use the given `Scheduler` to control the test. + pub fn new_with_time_model(scheduler: S, time_model: T, config: Config) -> Self { + let metrics_scheduler = MetricsScheduler::new(scheduler); + + Self { + scheduler: Rc::new(RefCell::new(metrics_scheduler)), + time_model: Rc::new(RefCell::new(time_model)), config, } } @@ -95,8 +111,9 @@ impl Runner { None => break, Some(s) => s, }; + self.time_model.borrow_mut().new_execution(); - let execution = Execution::new(self.scheduler.clone(), schedule); + let execution = Execution::new(self.scheduler.clone(), schedule, self.time_model.clone()); let f = Arc::clone(&f); // This is a slightly lazy way to ensure that everything outside of the "execution" span gets @@ -122,7 +139,6 @@ pub struct PortfolioRunner { stop_on_first_failure: bool, config: Config, } - impl PortfolioRunner { /// Construct a new `PortfolioRunner` with no schedulers. If `stop_on_first_failure` is true, /// all schedulers will be terminated as soon as any fails; if false, they will keep running diff --git a/shuttle/src/runtime/thread/continuation.rs b/shuttle/src/runtime/thread/continuation.rs index 5c6f2bdc..84520eb2 100644 --- a/shuttle/src/runtime/thread/continuation.rs +++ b/shuttle/src/runtime/thread/continuation.rs @@ -1,4 +1,5 @@ use crate::runtime::execution::ExecutionState; +use crate::time::get_time_model; use crate::{ContinuationFunctionBehavior, UNGRACEFUL_SHUTDOWN_CONFIG}; use corosensei::Yielder; use corosensei::{stack::DefaultStack, Coroutine, CoroutineResult}; @@ -342,6 +343,7 @@ pub(crate) fn switch() { ContinuationInput::Resume => {} }; } + get_time_model().borrow_mut().step() } #[cfg(test)] diff --git a/shuttle/src/thread.rs b/shuttle/src/thread.rs index 5d945ac3..de7316e9 100644 --- a/shuttle/src/thread.rs +++ b/shuttle/src/thread.rs @@ -3,11 +3,13 @@ use crate::runtime::execution::ExecutionState; use crate::runtime::task::TaskId; use crate::runtime::thread; +use crate::time::Duration; +use std::fmt::Debug; use std::marker::PhantomData; use std::panic::Location; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::time::Duration; +pub use crate::time::sleep; pub use std::thread::{panicking, Result}; /// A unique identifier for a running thread @@ -348,12 +350,6 @@ pub fn yield_now() { thread::switch(); } -/// Puts the current thread to sleep for at least the specified amount of time. -// Note that Shuttle does not model time, so this behaves just like a context switch. -pub fn sleep(_dur: Duration) { - thread::switch(); -} - /// Get a handle to the thread that invokes it pub fn current() -> Thread { let (task_id, name) = ExecutionState::with(|s| { diff --git a/shuttle/src/time/constant_stepped.rs b/shuttle/src/time/constant_stepped.rs new file mode 100644 index 00000000..d406cb79 --- /dev/null +++ b/shuttle/src/time/constant_stepped.rs @@ -0,0 +1,107 @@ +use std::{ + cmp::{max, Reverse}, + collections::{BinaryHeap, HashMap}, + task::Waker, +}; + +use tracing::{trace, warn}; + +use crate::{current::TaskId, runtime::execution::ExecutionState, time::WakerRegistered}; + +use super::{Duration, Instant, TimeModel}; + +/// A time model where time advances by a constant amount for each scheduling step +#[derive(Clone, Debug)] +pub struct ConstantSteppedTimeModel { + step_size: std::time::Duration, + current_time_elapsed: std::time::Duration, + waiters: BinaryHeap>, + wakers: HashMap, +} + +unsafe impl Send for ConstantSteppedTimeModel {} + +impl ConstantSteppedTimeModel { + /// Create a ConstantSteppedTimeModel + pub fn new(step_size: std::time::Duration) -> Self { + Self { + step_size, + current_time_elapsed: std::time::Duration::from_secs(0), + waiters: BinaryHeap::new(), + wakers: HashMap::new(), + } + } + + // Unblocks expired. Returns `true` if any were woken. + fn unblock_expired(&mut self) -> bool { + let mut out = false; + while let Some(waker_key) = self.waiters.peek().and_then(|Reverse((t, _, sleep_id))| { + if *t <= self.current_time_elapsed { + Some(*sleep_id) + } else { + None + } + }) { + _ = self.waiters.pop(); + if let Some(waker) = self.wakers.remove(&waker_key) { + waker.wake(); + } + out = true + } + out + } +} + +impl TimeModel for ConstantSteppedTimeModel { + fn pause(&mut self) { + warn!("Pausing stepped model has no effect") + } + + fn resume(&mut self) { + warn!("Resuming stepped model has no effect") + } + + fn step(&mut self) { + self.current_time_elapsed += self.step_size; + trace!("time step to {:?}", self.current_time_elapsed); + self.unblock_expired(); + } + + fn new_execution(&mut self) { + self.current_time_elapsed = std::time::Duration::from_secs(0); + self.waiters.clear(); + self.wakers.clear(); + } + + fn instant(&self) -> Instant { + Instant::Simulated(self.current_time_elapsed) + } + + fn wake_next(&mut self) -> bool { + if let Some(Reverse((time, _, _))) = self.waiters.peek() { + self.current_time_elapsed = max(self.current_time_elapsed, *time); + self.unblock_expired(); + true + } else { + false + } + } + + fn advance(&mut self, dur: Duration) { + self.current_time_elapsed += dur; + } + + fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Waker) -> WakerRegistered { + let deadline = deadline.unwrap_simulated(); + if deadline <= self.current_time_elapsed { + return WakerRegistered::NotRegistered; + } + + let task_id = ExecutionState::with(|s| s.current().id()); + let item = (deadline, task_id, sleep_id); + self.waiters.push(Reverse(item)); + self.wakers.insert(sleep_id, waker); + + WakerRegistered::Registered + } +} diff --git a/shuttle/src/time/frozen.rs b/shuttle/src/time/frozen.rs new file mode 100644 index 00000000..8ab60424 --- /dev/null +++ b/shuttle/src/time/frozen.rs @@ -0,0 +1,60 @@ +use std::task::Waker; + +use crate::time::WakerRegistered; + +use super::{constant_stepped::ConstantSteppedTimeModel, Duration, Instant, TimeModel}; + +/// A time model where time does not advance unless forced +#[derive(Debug, Clone)] +pub struct FrozenTimeModel { + inner: ConstantSteppedTimeModel, +} + +impl FrozenTimeModel { + /// Create a new Frozen time model + pub fn new() -> Self { + Self::default() + } +} + +impl Default for FrozenTimeModel { + fn default() -> Self { + Self { + inner: ConstantSteppedTimeModel::new(std::time::Duration::ZERO), + } + } +} + +impl TimeModel for FrozenTimeModel { + fn pause(&mut self) { + self.inner.pause(); + } + + fn resume(&mut self) { + self.inner.resume(); + } + + fn step(&mut self) { + self.inner.step(); + } + + fn new_execution(&mut self) { + self.inner.new_execution(); + } + + fn instant(&self) -> Instant { + self.inner.instant() + } + + fn wake_next(&mut self) -> bool { + self.inner.wake_next() + } + + fn advance(&mut self, dur: Duration) { + self.inner.advance(dur); + } + + fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Waker) -> WakerRegistered { + self.inner.register_sleep(deadline, sleep_id, waker) + } +} diff --git a/shuttle/src/time/mod.rs b/shuttle/src/time/mod.rs new file mode 100644 index 00000000..efbb6144 --- /dev/null +++ b/shuttle/src/time/mod.rs @@ -0,0 +1,800 @@ +//! Time +//! +//! Timing primitives allow Shuttle tests to interact with wall-clock time (Instant, Duration, Timeout, etc.) in a deterministic manner + +use std::cmp::Ordering; +use std::future::Future; +#[cfg(feature = "advanced-time-models")] +use std::ops::Mul; +use std::ops::{Add, AddAssign, Sub, SubAssign}; + +use std::{cell::RefCell, rc::Rc}; + +use std::pin::Pin; + +use pin_project::pin_project; +use std::task::{Context, Poll, Waker}; +use tracing::warn; + +use crate::runtime::execution::ExecutionState; + +use crate::runtime::thread; + +/// Constant stepped time model implementation +pub mod constant_stepped; +/// Frozen time model implementation +pub mod frozen; + +/// Returns the current count of created timeout/sleep futures +pub fn timer_count() -> u64 { + ExecutionState::with(|state| state.timer_id_counter) +} + +fn increment_timer_counter() -> u64 { + ExecutionState::with(|state| { + state.timer_id_counter += 1; + state.timer_id_counter + }) +} + +/// A distribution of times which can be sampled +pub trait TimeDistribution { + /// Sample a duration from the given distribution + fn sample(&self) -> D; +} + +/// Whether or not a waker was registered on call to `register_sleep` +#[derive(Debug)] +pub enum WakerRegistered { + /// Waker was registered (ie. caller can return `Poll::Pending`) + Registered, + /// Waker was not registered (ie. the time duration has elapsed) + NotRegistered, +} + +/// The trait implemented by each TimeModel +pub trait TimeModel: std::fmt::Debug { + /// Wake the next sleeping task; returns true if there exists a task that was able to be woken. + /// Called when all tasks are blocked to resolve timing based deadlocks (all unblocked tasks are sleeping). + fn wake_next(&mut self) -> bool; + + /// Reset the TimeModel state for the next Shuttle iteration + fn new_execution(&mut self); + + /// Callback after each scheduling step to allow the TimeModel to update itself + fn step(&mut self); + + /// Used to create the TimeModel's Instant struct in functions like Instant::now() + fn instant(&self) -> Instant; + + /// Pauses the TimeModel + fn pause(&mut self); + + /// Resumes the TimeModel + fn resume(&mut self); + + /// Manually advances the TimeModel's clock by a fixed amount + fn advance(&mut self, duration: Duration); + + /// Callback for registering a sleep/timeout on the current task. It is up to the TimeModel + /// implementation to determine when to wake the sleeping task. + fn register_sleep(&mut self, deadline: Instant, id: u64, waker: Waker) -> WakerRegistered; +} + +/// Provides a reference to the current TimeModel for this execution. +/// Uses `Rc::clone` so that ExecutionState isn't already borrowed when running most TimeModel methods +pub fn get_time_model() -> Rc> { + ExecutionState::with(|s| Rc::clone(&s.time_model)) +} + +#[cfg(feature = "advanced-time-models")] +mod advanced_duration { + use super::*; + + /// A Shuttle duration + #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] + pub enum Duration { + /// A concrete duration value + Std(std::time::Duration), + } + + impl Default for Duration { + fn default() -> Self { + Duration::Std(std::time::Duration::ZERO) + } + } + + impl Duration { + /// The maximum duration. + pub const MAX: Duration = Duration::Std(std::time::Duration::MAX); + /// Zero duration. + pub const ZERO: Duration = Duration::Std(std::time::Duration::ZERO); + + /// Creates a new Duration from the specified number of seconds. + pub fn from_secs(secs: u64) -> Self { + Duration::Std(std::time::Duration::from_secs(secs)) + } + + /// Creates a new Duration from the specified number of milliseconds. + pub fn from_millis(millis: u64) -> Self { + Duration::Std(std::time::Duration::from_millis(millis)) + } + + /// Creates a new Duration from the specified number of microseconds. + pub fn from_micros(micros: u64) -> Self { + Duration::Std(std::time::Duration::from_micros(micros)) + } + + /// Creates a new Duration from the specified number of nanoseconds. + pub fn from_nanos(nanos: u64) -> Self { + Duration::Std(std::time::Duration::from_nanos(nanos)) + } + + /// Returns the total number of nanoseconds contained by this Duration. + pub fn as_nanos(&self) -> u128 { + match self { + Duration::Std(d) => d.as_nanos(), + } + } + + /// Returns the total number of microseconds contained by this Duration. + pub fn as_micros(&self) -> u128 { + self.as_nanos() / 1000 + } + + /// Returns the total number of milliseconds contained by this Duration. + pub fn as_millis(&self) -> u128 { + self.as_micros() / 1000 + } + + /// Checked Duration addition. Computes self + other, returning None if overflow occurred. + pub fn checked_add(&self, other: Duration) -> Option { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => a.checked_add(b).map(Duration::Std), + } + } + + /// Checked Duration subtraction. Computes self - other, returning None if other is greater than self. + pub fn checked_sub(&self, other: Duration) -> Option { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => a.checked_sub(b).map(Duration::Std), + } + } + + /// Checked Duration multiplication. Computes self * other, returning None if overflow occurred. + pub fn checked_mul(&self, b: u32) -> Option { + match self { + Duration::Std(a) => a.checked_mul(b).map(Duration::Std), + } + } + + /// Checked Duration division. Computes self / other, returning None if other is zero or overflow occurred. + pub fn checked_div(&self, rhs: u32) -> Option { + match self { + Duration::Std(a) => a.checked_div(rhs).map(Duration::Std), + } + } + + /// Saturating Duration addition. Computes self + other, returning Duration::MAX if overflow occurred. + pub fn saturating_add(&self, other: Duration) -> Self { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => Duration::Std(a.saturating_add(b)), + } + } + + /// Saturating Duration subtraction. Computes self - other, returning Duration::ZERO if other is greater than self. + pub fn saturating_sub(&self, other: Duration) -> Self { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => Duration::Std(a.saturating_sub(b)), + } + } + + /// Saturating Duration multiplication. Computes self * other, returning Duration::MAX if overflow occurred. + pub fn saturating_mul(&self, rhs: u32) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.saturating_mul(rhs)), + } + } + + /// Multiplies Duration by f32. + pub fn mul_f32(&self, rhs: f32) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.mul_f32(rhs)), + } + } + + /// Multiplies Duration by f64. + pub fn mul_f64(&self, rhs: f64) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.mul_f64(rhs)), + } + } + + /// Divides Duration by f32. + pub fn div_f32(&self, rhs: f32) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.div_f32(rhs)), + } + } + + /// Divides Duration by f64. + pub fn div_f64(&self, rhs: f64) -> Self { + match self { + Duration::Std(a) => Duration::Std(a.div_f64(rhs)), + } + } + + /// Returns the number of seconds contained by this Duration as f64. + pub fn as_secs_f64(&self) -> f64 { + match self { + Duration::Std(d) => d.as_secs_f64(), + } + } + + /// Returns the number of seconds contained by this Duration as f32. + pub fn as_secs_f32(&self) -> f32 { + match self { + Duration::Std(d) => d.as_secs_f32(), + } + } + + /// Returns the total number of whole seconds contained by this Duration. + pub fn as_secs(&self) -> u64 { + match self { + Duration::Std(d) => d.as_secs(), + } + } + + /// Returns the fractional part of this Duration, in whole milliseconds. + pub fn subsec_millis(&self) -> u32 { + match self { + Duration::Std(d) => d.subsec_millis(), + } + } + + /// Returns the fractional part of this Duration, in whole microseconds. + pub fn subsec_micros(&self) -> u32 { + match self { + Duration::Std(d) => d.subsec_micros(), + } + } + + /// Returns the fractional part of this Duration, in nanoseconds. + pub fn subsec_nanos(&self) -> u32 { + match self { + Duration::Std(d) => d.subsec_nanos(), + } + } + + /// Creates a new Duration from the specified number of whole seconds and additional nanoseconds. + pub fn new(secs: u64, nanos: u32) -> Self { + Duration::Std(std::time::Duration::new(secs, nanos)) + } + + /// Creates a new Duration from the specified number of seconds represented as f64. + pub fn from_secs_f64(secs: f64) -> Self { + Duration::Std(std::time::Duration::from_secs_f64(secs)) + } + + /// Creates a new Duration from the specified number of seconds represented as f32. + pub fn from_secs_f32(secs: f32) -> Self { + Duration::Std(std::time::Duration::from_secs_f32(secs)) + } + + pub(crate) fn unwrap_std(self) -> std::time::Duration { + match self { + Duration::Std(d) => d, + } + } + } + + impl Ord for Duration { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Duration::Std(a), Duration::Std(b)) => a.cmp(b), + } + } + } + + impl PartialOrd for Duration { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Add for Duration { + type Output = Duration; + + fn add(self, other: Self) -> Self { + self.checked_add(other).unwrap() + } + } + + impl AddAssign for Duration { + fn add_assign(&mut self, other: Self) { + *self = self.checked_add(other).unwrap() + } + } + + impl SubAssign for Duration { + fn sub_assign(&mut self, other: Self) { + *self = self.checked_sub(other).unwrap() + } + } + + impl Mul for Duration { + type Output = Duration; + + fn mul(self, other: u32) -> Self { + self.checked_mul(other).unwrap() + } + } + + impl Mul for u32 { + type Output = Duration; + + fn mul(self, other: Duration) -> Duration { + other.checked_mul(self).unwrap() + } + } + + impl std::ops::Div for Duration { + type Output = Duration; + + fn div(self, rhs: u32) -> Duration { + self.checked_div(rhs).unwrap() + } + } + + impl std::ops::DivAssign for Duration { + fn div_assign(&mut self, rhs: u32) { + *self = self.checked_div(rhs).unwrap() + } + } + + impl From for Duration { + fn from(d: std::time::Duration) -> Self { + Duration::Std(d) + } + } + + impl From for std::time::Duration { + fn from(d: Duration) -> Self { + d.unwrap_std() + } + } + + impl std::ops::Sub for Duration { + type Output = Duration; + + fn sub(self, other: Self) -> Self { + self.checked_sub(other).unwrap() + } + } +} + +#[cfg(feature = "advanced-time-models")] +pub use advanced_duration::Duration; + +#[cfg(not(feature = "advanced-time-models"))] +pub use std::time::Duration; + +/// A Shuttle Instant +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Instant { + /// Deterministically simulated clock time represented by a Duration from the start of the test + Simulated(std::time::Duration), +} + +#[cfg(feature = "advanced-time-models")] +impl Instant { + /// Returns Some(t) where t is the time self - duration if t can be represented as Instant (which means it’s inside the bounds of the underlying data structure), None otherwise. + pub fn checked_sub(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => match duration { + Duration::Std(b) => a.checked_sub(b).map(Instant::Simulated), + }, + } + } + + /// Returns the amount of time elapsed from another instant to this one, or None if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can return None. + pub fn checked_duration_since(&self, earlier: Instant) -> Option { + match (self, earlier) { + (Instant::Simulated(a), Instant::Simulated(b)) => a.checked_sub(b).map(Duration::Std), + } + } + + /// Returns the amount of time elapsed from another instant to this one, or panics if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can panic. + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + { + self.checked_duration_since(earlier).unwrap_or(Duration::ZERO) + } + } + + /// Returns Some(t) where t is the time self + duration if t can be represented as Instant (which means it's inside the bounds + /// of the underlying data structure), None otherwise. + pub fn checked_add(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => match duration { + Duration::Std(b) => a.checked_add(b).map(Instant::Simulated), + }, + } + } +} + +#[cfg(not(feature = "advanced-time-models"))] +impl Instant { + /// Returns Some(t) where t is the time self - duration if t can be represented as Instant (which means it’s inside the bounds of the underlying data structure), None otherwise. + pub fn checked_sub(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => a.checked_sub(duration).map(Instant::Simulated), + } + } + + /// Returns the amount of time elapsed from another instant to this one, or None if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can return None. + pub fn checked_duration_since(&self, earlier: Instant) -> Option { + match (self, earlier) { + (Instant::Simulated(a), Instant::Simulated(b)) => a.checked_sub(b), + } + } + + /// Returns the amount of time elapsed from another instant to this one, or panics if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can panic. + pub fn saturating_duration_since(&self, earlier: Instant) -> Duration { + { + self.checked_duration_since(earlier) + .unwrap_or(std::time::Duration::ZERO) + } + } + + /// Returns Some(t) where t is the time self + duration if t can be represented as Instant (which means it's inside the bounds + /// of the underlying data structure), None otherwise. + pub fn checked_add(&self, duration: Duration) -> Option { + match self { + Instant::Simulated(a) => a.checked_add(duration).map(Instant::Simulated), + } + } +} + +impl Instant { + /// Returns an instant corresponding to “now”. + pub fn now() -> Self { + get_time_model().borrow().instant() + } + + /// Cast this instant to a simulated time represented by a std::time::Duration + pub fn unwrap_simulated(self) -> std::time::Duration { + match self { + Instant::Simulated(d) => d, + } + } + + /// Returns the amount of time elapsed from another instant to this one, or panics if that instant is later than this one. + /// Due to monotonicity bugs, even under correct logical ordering of the passed Instants, this method can panic. + pub fn duration_since(&self, earlier: Instant) -> Duration { + self.checked_duration_since(earlier).unwrap() + } + + /// Returns the amount of time elapsed since this instant. + /// Previous Rust versions panicked when the current time was earlier than self. Currently this method returns a Duration of + /// zero in that case. Future versions may reintroduce the panic. + pub fn elapsed(&self) -> Duration { + Instant::now() + .checked_duration_since(*self) + .unwrap_or(Duration::from_secs(0)) + } + + /// Returns t where t is the time self + duration if t can be represented as Instant otherwise it saturates to the maximum time value + pub fn saturating_add(&self, duration: Duration) -> Self { + match self { + Instant::Simulated(_) => self.checked_add(duration).unwrap_or(Instant::Simulated(Duration::MAX)), + } + } +} + +impl Add for Instant { + type Output = Instant; + + fn add(self, other: Duration) -> Instant { + self.checked_add(other).unwrap() + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, other: Duration) { + *self = self.checked_add(other).unwrap() + } +} + +impl Sub for Instant { + type Output = Duration; + + fn sub(self, earlier: Instant) -> Duration { + self.checked_duration_since(earlier).unwrap() + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, other: Duration) { + *self = *self - other + } +} + +impl Sub for Instant { + type Output = Instant; + + fn sub(self, other: Duration) -> Instant { + self.checked_sub(other).unwrap() + } +} + +impl Ord for Instant { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Instant::Simulated(a), Instant::Simulated(b)) => a.cmp(b), + } + } +} + +impl PartialOrd for Instant { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Puts the current thread to sleep +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn sleep(dur: Duration) { + if dur == Duration::ZERO { + thread::switch(); + return; + } + crate::future::block_on(async_sleep(dur)); +} + +/// Advances the current global time without putting the current thread to sleep +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn advance(dur: Duration) { + get_time_model().borrow_mut().advance(dur); + thread::switch(); +} + +/// Returns a future which sleeps until the duration has elapsed +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_sleep(dur: Duration) -> Sleep { + async_sleep_until(Instant::now().saturating_add(dur)) +} + +/// Returns a future which sleeps until the deadline is reached +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_sleep_until(deadline: Instant) -> Sleep { + let id = increment_timer_counter(); + Sleep { id, deadline } +} + +/// Returns a struct which sleeps repeatedly at a fixed time intervals (a tokio::time::Interval) +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_interval(dur: Duration) -> Interval { + Interval { + start: None, + ticks: 0, + current_interval: None, + period: dur, + } +} + +/// Returns a struct which sleeps repeatedly at a fixed time intervals (a tokio::time::Interval) +/// This Interval starts at a fixed start time. +/// Behavior of this function depends on the TimeModel provided to Shuttle +pub fn async_interval_at(start: Instant, period: Duration) -> Interval { + Interval { + start: Some(start), + ticks: 0, + current_interval: None, + period, + } +} + +/// A future which returns Poll::Pending until its deadline +#[pin_project] +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Sleep { + id: u64, + deadline: Instant, +} + +impl Future for Sleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match get_time_model() + .borrow_mut() + .register_sleep(self.deadline, self.id, cx.waker().clone()) + { + WakerRegistered::Registered => Poll::Pending, + WakerRegistered::NotRegistered => Poll::Ready(()), + } + } +} + +impl Sleep { + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.deadline + } + + /// Returns `true` if `Sleep` has elapsed. + /// + /// A `Sleep` instance is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.deadline.checked_duration_since(Instant::now()).is_none() + } + + /// Resets the `Sleep` instance to a new deadline. + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + let me = self.project(); + *me.deadline = deadline; + } +} + +/// Timeout a future +#[pin_project] +#[derive(Debug)] +pub struct Interval { + start: Option, + ticks: u32, + period: Duration, + current_interval: Option>>, +} + +/// Missed tick behavior for Interval +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MissedTickBehavior { + /// Ticks as fast as possible until caught up. + Burst, + /// Tick at multiples of period from when tick was called, rather than from start. + Delay, + /// Skips missed ticks and tick on the next multiple of period from start. + Skip, +} + +impl Interval { + /// tick + pub async fn tick(&mut self) -> Instant { + let deadline = self.next_deadline(); + async_sleep_until(deadline).await; + self.ticks += 1; + deadline + } + + /// period for an Interval + pub fn period(&self) -> Duration { + self.period + } + + fn next_deadline(&mut self) -> Instant { + if let Some(start) = self.start { + let mut total_duration = Duration::ZERO; + total_duration += self.period * self.ticks; + start.checked_add(total_duration).unwrap() + } else { + let now = Instant::now(); + self.start = Some(now); + now + } + } + + /// poll tick + pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll { + let deadline = self.next_deadline(); + if self.current_interval.is_none() { + self.current_interval = Some(Box::pin(async_sleep_until(deadline))); + } + + match self.current_interval.as_mut().unwrap().as_mut().poll(cx) { + Poll::Ready(_) => { + self.current_interval = None; + Poll::Ready(deadline) + } + Poll::Pending => Poll::Pending, + } + } + + /// reset + pub fn reset(&mut self) { + self.start = None; + self.ticks = 0; + if let Some(x) = self.current_interval.as_mut() { + x.as_mut().reset(Instant::now()); + } + } + + /// Unimplemented + pub fn set_missed_tick_behavior(&mut self, _behavior: MissedTickBehavior) { + warn!("set missed tick behavior unimplemented: no effect!"); + } + + /// Unimplemented + pub fn missed_tick_behavior(&mut self) -> MissedTickBehavior { + warn!("set missed tick behavior unimplemented: no effect!"); + MissedTickBehavior::Burst + } +} + +/// Timeout a future +pub fn async_timeout(d: Duration, f: F) -> Timeout +where + F: Future, +{ + let id = increment_timer_counter(); + Timeout { + id, + deadline: Instant::now().saturating_add(d), + future: f, + } +} + +/// Timeout a future +#[pin_project] +#[derive(Debug)] +pub struct Timeout +where + F: Future, +{ + id: u64, + deadline: Instant, + #[pin] + future: F, +} + +/// Elapsed time error variant +#[derive(Debug, PartialEq, Eq)] +pub struct Elapsed(()); + +impl std::fmt::Display for Elapsed { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + "deadline has elapsed".fmt(fmt) + } +} + +impl std::error::Error for Elapsed {} + +impl From for std::io::Error { + fn from(_err: Elapsed) -> std::io::Error { + std::io::ErrorKind::TimedOut.into() + } +} + +impl Future for Timeout +where + F: Future, +{ + type Output = std::result::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let now = Instant::now(); + let this = self.project(); + + if *this.deadline <= now { + return Poll::Ready(Err(Elapsed(()))); + } + + let time_model = get_time_model(); + match this.future.poll(cx) { + Poll::Pending => { + match time_model + .borrow_mut() + .register_sleep(*this.deadline, *this.id, cx.waker().clone()) + { + WakerRegistered::NotRegistered => Poll::Ready(Err(Elapsed(()))), + WakerRegistered::Registered => Poll::Pending, + } + } + Poll::Ready(x) => Poll::Ready(Ok(x)), + } + } +} diff --git a/shuttle/tests/basic/mod.rs b/shuttle/tests/basic/mod.rs index 88bd38d1..b193b898 100644 --- a/shuttle/tests/basic/mod.rs +++ b/shuttle/tests/basic/mod.rs @@ -20,6 +20,7 @@ mod shrink; mod tag; mod task; mod thread; +mod time; mod timeout; mod tracing; mod uncontrolled_nondeterminism; diff --git a/shuttle/tests/basic/pct.rs b/shuttle/tests/basic/pct.rs index 1ff4f6a3..4defc795 100644 --- a/shuttle/tests/basic/pct.rs +++ b/shuttle/tests/basic/pct.rs @@ -1,10 +1,10 @@ use shuttle::scheduler::PctScheduler; use shuttle::sync::Mutex; +use shuttle::time::Duration; use shuttle::{check_pct, check_random, thread, Config, MaxSteps, Runner}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; use test_log::test; const TEST_LENGTH: usize = 20; @@ -18,7 +18,7 @@ fn figure5() { thread::spawn(move || { for _ in 0..TEST_LENGTH { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } *lock_clone.lock().unwrap() = 1; @@ -78,7 +78,7 @@ fn yield_spin_loop(use_yield: bool) { if use_yield { thread::yield_now(); } else { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } } }); @@ -110,21 +110,21 @@ fn figure1a_pct() { thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } *t1.lock().unwrap() = Some(1); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let _ = t2.lock().unwrap().expect("null dereference"); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); }); @@ -141,7 +141,7 @@ fn figure1b(num_threads: usize) { for _ in 0..num_threads - 2 { thread::spawn(|| { for _ in 0..5 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); } @@ -149,17 +149,17 @@ fn figure1b(num_threads: usize) { // Main worker threads take 10 steps each thread::spawn(move || { for _ in 0..5 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } *x1.lock().unwrap() = None; for _ in 0..4 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); thread::spawn(move || { for _ in 0..4 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let b = { let b = x2.lock().unwrap().is_some(); @@ -169,7 +169,7 @@ fn figure1b(num_threads: usize) { let _ = x2.lock().unwrap().expect("null dereference"); } for _ in 0..4 { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } }); } @@ -216,30 +216,30 @@ fn figure_1c() { thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let a = a1.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let b = b1.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } assert_eq!(*a + *b, 0) }); thread::spawn(move || { for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let b = b2.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } let a = a2.lock().unwrap(); for _ in 0..COUNT { - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::ZERO); } assert_eq!(*a + *b, 0); }); diff --git a/shuttle/tests/basic/time.rs b/shuttle/tests/basic/time.rs new file mode 100644 index 00000000..86a338f2 --- /dev/null +++ b/shuttle/tests/basic/time.rs @@ -0,0 +1,184 @@ +use shuttle::scheduler::{DfsScheduler, RandomScheduler}; +use shuttle::time::constant_stepped::ConstantSteppedTimeModel; +use shuttle::time::{async_interval, async_sleep, async_timeout, Duration, Instant}; +use shuttle::{future, thread, Config, Runner}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tracing::trace; + +#[test] +fn test_stepped_blocking_sleep() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + let start = Instant::now(); + thread::sleep(Duration::from_millis(50) + Duration::from_millis(50)); + let elapsed = start.elapsed(); + assert!(elapsed.as_millis() == 100); + }); +} + +/// This test includes 3 scheduling points which should advance the global time: +/// - 1 spawn and 1 yield on the main thread +/// - 1 yield on the child thread +/// +/// Thus with a stepped time, it should be possible for either side of the condition to be met: +/// - True => both threads have executed (3 events * time_step) +/// - False => only the main thread has executed (2 events * time_step) +#[test] +fn test_stepped_elapsed_time() { + let less_than_count = Arc::new(AtomicUsize::new(0)); + let greater_than_count = Arc::new(AtomicUsize::new(0)); + + let time_step = Duration::from_micros(10); + + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = DfsScheduler::new(None, false); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + let less_count_inner = less_than_count.clone(); + let greater_count_inner = greater_than_count.clone(); + + runner.run(move || { + let start = Instant::now(); + thread::spawn(|| { + thread::yield_now(); + }); + thread::yield_now(); + let elapsed = start.elapsed(); + trace!("elapsed {:?}", elapsed); + + if elapsed > 2 * time_step { + greater_count_inner.fetch_add(1, Ordering::SeqCst); + } else { + less_count_inner.fetch_add(1, Ordering::SeqCst); + } + }); + + let less_count = less_than_count.load(Ordering::SeqCst); + let greater_count = greater_than_count.load(Ordering::SeqCst); + + assert!( + less_count > 0, + "Expected some executions with elapsed <= {:?}, got {}", + time_step * 2, + less_count + ); + assert!( + greater_count > 0, + "Expected some executions with elapsed > {:?}, got {}", + time_step * 2, + greater_count + ); +} + +#[test] +fn test_stepped_async_sleep() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let start = Instant::now(); + async_sleep(Duration::from_millis(50)).await; + let elapsed = start.elapsed(); + assert_eq!(elapsed.as_millis(), 50); + }); + }); +} + +#[test] +fn test_stepped_timeout_expired() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let start = Instant::now(); + let result = async_timeout(Duration::from_millis(50), async { + async_sleep(Duration::from_millis(100)).await; + 42 + }) + .await; + assert!(result.is_err()); + assert_eq!(start.elapsed().as_millis(), 50); + }); + }); +} + +#[test] +fn test_stepped_timeout_not_expired() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let result = async_timeout(Duration::from_millis(50), async { + async_sleep(Duration::from_millis(20)).await; + 42 + }) + .await; + assert_eq!(result.unwrap(), 42); + }); + }); +} + +#[test] +fn test_async_interval() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_micros(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + runner.run(|| { + future::block_on(async { + let mut interval = async_interval(Duration::from_millis(10)); + let start = Instant::now(); + interval.tick().await; + let first_tick = start.elapsed(); + interval.tick().await; + let second_tick = start.elapsed(); + interval.tick().await; + let third_tick = start.elapsed(); + + assert_eq!(first_tick.as_millis(), 0); + assert_eq!(second_tick.as_millis(), 10); + assert_eq!(third_tick.as_millis(), 20); + }); + }); +} + +/// Probabilistically force a switch for random schedulers [P(no switch) = 1/T^bound for T threads] +/// Returns true if the condition ever holds while spinning. +fn spin_switch_and_get_any(condition: F) -> bool +where + F: Fn() -> bool, +{ + let bound = 10000; + for _ in 0..bound { + thread::yield_now(); + if condition() { + return true; + }; + } + false +} + +#[test] +fn test_stepped_sleep_woken_by_thread_steps() { + let time_model = ConstantSteppedTimeModel::new(std::time::Duration::from_millis(10)); + let scheduler = RandomScheduler::new(10); + let runner = Runner::new_with_time_model(scheduler, time_model, Config::new()); + + runner.run(|| { + let sleep_completed = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let sleep_completed_clone = sleep_completed.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + sleep_completed_clone.store(true, Ordering::SeqCst); + }); + + // Take many steps to advance time and wake the sleeping thread + assert!(spin_switch_and_get_any(|| sleep_completed.load(Ordering::SeqCst))); + }); +}