diff --git a/shuttle/src/future/batch_semaphore.rs b/shuttle/src/future/batch_semaphore.rs index e437d0f7..42e8d927 100644 --- a/shuttle/src/future/batch_semaphore.rs +++ b/shuttle/src/future/batch_semaphore.rs @@ -1,7 +1,7 @@ //! A counting semaphore supporting both async and sync operations. use crate::current; use crate::runtime::execution::ExecutionState; -use crate::runtime::task::{clock::VectorClock, TaskId}; +use crate::runtime::task::{clock::VectorClock, Event, TaskId}; use crate::runtime::thread; use crate::sync::{ResourceSignature, ResourceType}; use std::cell::RefCell; @@ -289,7 +289,6 @@ impl BatchSemaphoreState { pub struct BatchSemaphore { state: RefCell, fairness: Fairness, - #[allow(unused)] signature: ResourceSignature, } @@ -392,8 +391,9 @@ impl BatchSemaphore { /// Closes the semaphore. This prevents the semaphore from issuing new /// permits and notifies all pending waiters. + #[track_caller] pub fn close(&self) { - thread::switch(); + thread::switch(Event::batch_semaphore_rel(&self.signature)); self.init_object_id(); let mut state = self.state.borrow_mut(); @@ -436,8 +436,9 @@ impl BatchSemaphore { /// If the permits are available, returns Ok(()) /// If the semaphore is closed, returns `Err(TryAcquireError::Closed)` /// If there aren't enough permits, returns `Err(TryAcquireError::NoPermits)` + #[track_caller] pub fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> { - thread::switch(); + thread::switch(Event::batch_semaphore_acq(&self.signature)); self.init_object_id(); let mut state = self.state.borrow_mut(); @@ -538,6 +539,7 @@ impl BatchSemaphore { } /// Acquire the specified number of permits (async API) + #[track_caller] pub fn acquire(&self, num_permits: usize) -> Acquire<'_> { // No switch here; switch should be triggered on polling future self.init_object_id(); @@ -545,6 +547,7 @@ impl BatchSemaphore { } /// Acquire the specified number of permits (blocking API) + #[track_caller] pub fn acquire_blocking(&self, num_permits: usize) -> Result<(), AcquireError> { // No switch here; switch should be triggered on polling future self.init_object_id(); @@ -552,8 +555,9 @@ impl BatchSemaphore { } /// Release `num_permits` back to the Semaphore + #[track_caller] pub fn release(&self, num_permits: usize) { - thread::switch(); + thread::switch(Event::batch_semaphore_rel(&self.signature)); self.init_object_id(); if num_permits == 0 { @@ -645,6 +649,7 @@ pub struct Acquire<'a> { } impl<'a> Acquire<'a> { + #[track_caller] fn new(semaphore: &'a BatchSemaphore, num_permits: usize) -> Self { let waiter = Arc::new(Waiter::new(num_permits)); Self { @@ -689,7 +694,7 @@ impl Future for Acquire<'_> { let blocking_is_not_commutative = self.semaphore.fairness == Fairness::StrictlyFair; if self.never_polled && (will_succeed || blocking_is_not_commutative) { - thread::switch(); + thread::switch(Event::batch_semaphore_acq(&self.semaphore.signature)); } self.never_polled = false; @@ -778,12 +783,25 @@ impl Future for Acquire<'_> { self.waiter.is_queued.store(true, Ordering::SeqCst); } trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter); + + let event = Event::batch_semaphore_acq(&self.semaphore.signature); + // SAFETY: This is safe because the current task immediately suspends after this future + // returns Poll::Pending (src/future/mod.rs). Whenever a task resumes, the `next_event` + // is unset, so there is no opportunity to corrupt the reference to our signature while + // it is set as the `next_task`. + ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) }); Poll::Pending } Err(TryAcquireError::Closed) => unreachable!(), } } else { // No progress made, future is still pending. + let event = Event::batch_semaphore_acq(&self.semaphore.signature); + // SAFETY: This is safe because the current task immediately suspends after this future + // returns Poll::Pending (src/future/mod.rs). Whenever a task resumes, the `next_event` + // is unset, so there is no opportunity to corrupt the reference to our signature while + // it is set as the `next_task`. + ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) }); Poll::Pending } } diff --git a/shuttle/src/future/mod.rs b/shuttle/src/future/mod.rs index b3116afa..a137ba23 100644 --- a/shuttle/src/future/mod.rs +++ b/shuttle/src/future/mod.rs @@ -249,7 +249,7 @@ pub fn block_on(future: F) -> F::Output { Poll::Ready(result) => break result, Poll::Pending => { ExecutionState::with(|state| state.current_mut().sleep_unless_woken()); - thread::switch(); + thread::switch_keeping_current_event(); } } } diff --git a/shuttle/src/runtime/execution.rs b/shuttle/src/runtime/execution.rs index 3fbebfee..ce4b7a3b 100644 --- a/shuttle/src/runtime/execution.rs +++ b/shuttle/src/runtime/execution.rs @@ -2,7 +2,7 @@ use crate::runtime::failure::{init_panic_hook, persist_failure}; use crate::runtime::storage::{StorageKey, StorageMap}; use crate::runtime::task::clock::VectorClock; use crate::runtime::task::labels::Labels; -use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS}; +use crate::runtime::task::{ChildLabelFn, Event, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS}; use crate::runtime::thread; use crate::runtime::thread::continuation::PooledContinuation; use crate::scheduler::{Schedule, Scheduler}; @@ -531,7 +531,8 @@ impl ExecutionState { where F: Future + 'static, { - thread::switch(); + let signature = ExecutionState::with(|state| state.current_mut().signature.new_child(caller)); + thread::switch(Event::Spawn(&signature)); let task_id = Self::with(|state| { let schedule_len = CurrentSchedule::len(); let parent_span_id = state.top_level_span.id(); @@ -554,7 +555,7 @@ impl ExecutionState { schedule_len, tag, Some(state.current().id()), - state.current_mut().signature.new_child(caller), + signature, ); state.tasks.push(task); @@ -574,7 +575,8 @@ impl ExecutionState { mut initial_clock: Option, caller: &'static Location<'static>, ) -> TaskId { - thread::switch(); + let signature = ExecutionState::with(|state| state.current_mut().signature.new_child(caller)); + thread::switch(Event::Spawn(&signature)); let task_id = Self::with(|state| { let parent_span_id = state.top_level_span.id(); let task_id = TaskId(state.tasks.len()); @@ -601,7 +603,7 @@ impl ExecutionState { CurrentSchedule::len(), tag, Some(state.current().id()), - state.current_mut().signature.new_child(caller), + signature, ); state.tasks.push(task); @@ -658,8 +660,10 @@ impl ExecutionState { if std::thread::panicking() && !state.in_cleanup { return true; } + debug_assert!( - matches!(state.current_task, ScheduledTask::Some(_)) && state.next_task == ScheduledTask::None, + matches!(state.current_task, ScheduledTask::Some(_) | ScheduledTask::Finished) + && state.next_task == ScheduledTask::None, "we're inside a task and scheduler should not yet have run" ); @@ -750,6 +754,10 @@ impl ExecutionState { self.tasks.get(id.0) } + pub(crate) fn try_current_mut(&mut self) -> Option<&mut Task> { + self.tasks.get_mut(self.current_task.id()?.0) + } + pub(crate) fn in_cleanup(&self) -> bool { self.in_cleanup } @@ -881,7 +889,7 @@ impl ExecutionState { .scheduler .borrow_mut() .next_task(task_refs, self.current_task.id(), is_yielding) - .map(ScheduledTask::Some) + .map(|task| ScheduledTask::Some(task.id())) .unwrap_or(ScheduledTask::Stopped); // Tracing this `in_scope` is purely a matter of taste. We do it because diff --git a/shuttle/src/runtime/runner.rs b/shuttle/src/runtime/runner.rs index 11ea0b42..8aa77f86 100644 --- a/shuttle/src/runtime/runner.rs +++ b/shuttle/src/runtime/runner.rs @@ -237,12 +237,12 @@ impl Scheduler for PortfolioStoppableScheduler { } } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option { + ) -> Option<&'a Task> { if self.stop_signal.load(Ordering::SeqCst) { None } else { diff --git a/shuttle/src/runtime/task/mod.rs b/shuttle/src/runtime/task/mod.rs index c2de013b..e509be8b 100644 --- a/shuttle/src/runtime/task/mod.rs +++ b/shuttle/src/runtime/task/mod.rs @@ -255,6 +255,132 @@ impl PartialEq for TaskSignature { impl Eq for TaskSignature {} +pub(crate) type Loc = &'static Location<'static>; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) enum Event<'a> { + AtomicRead(&'a ResourceSignature, Loc), + AtomicWrite(&'a ResourceSignature, Loc), + AtomicReadWrite(&'a ResourceSignature, Loc), + BatchSemaphoreAcq(&'a ResourceSignature, Loc), + BatchSemaphoreRel(&'a ResourceSignature, Loc), + BarrierWait(&'a ResourceSignature, Loc), + CondvarWait(&'a ResourceSignature, Loc), + CondvarNotify(Loc), + Park(Loc), + Unpark(&'a TaskSignature, Loc), + ChannelSend(&'a ResourceSignature, Loc), + ChannelRecv(&'a ResourceSignature, Loc), + Spawn(&'a TaskSignature), + Yield(Loc), + Sleep(Loc), + Exit, + Join(&'a TaskSignature, Loc), + Unknown, +} + +impl<'a> std::fmt::Display for Event<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Event::AtomicRead(_, loc) => write!(f, "AtomicRead at {}", loc), + Event::AtomicWrite(_, loc) => write!(f, "AtomicWrite at {}", loc), + Event::AtomicReadWrite(_, loc) => write!(f, "AtomicReadWrite at {}", loc), + Event::BatchSemaphoreAcq(_, loc) => write!(f, "BatchSemaphoreAcq at {}", loc), + Event::BatchSemaphoreRel(_, loc) => write!(f, "BatchSemaphoreRel at {}", loc), + Event::BarrierWait(_, loc) => write!(f, "BarrierWait at {}", loc), + Event::CondvarWait(_, loc) => write!(f, "CondvarWait at {}", loc), + Event::CondvarNotify(loc) => write!(f, "CondvarNotify at {}", loc), + Event::Park(loc) => write!(f, "Park at {}", loc), + Event::Unpark(_, loc) => write!(f, "Unpark at {}", loc), + Event::ChannelSend(_, loc) => write!(f, "ChannelSend at {}", loc), + Event::ChannelRecv(_, loc) => write!(f, "ChannelRecv at {}", loc), + Event::Spawn(sig) => write!(f, "Spawn at {}", sig.task_creation_stack.last().unwrap().0), + Event::Yield(loc) => write!(f, "Yield at {}", loc), + Event::Sleep(loc) => write!(f, "Sleep at {}", loc), + Event::Exit => write!(f, "Exit"), + Event::Join(_, loc) => write!(f, "Join at {}", loc), + Event::Unknown => write!(f, "Unknown"), + } + } +} + +impl<'a> Event<'a> { + #[track_caller] + pub(crate) fn atomic_read(sig: &'a ResourceSignature) -> Self { + Self::AtomicRead(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn atomic_write(sig: &'a ResourceSignature) -> Self { + Self::AtomicWrite(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn atomic_read_write(sig: &'a ResourceSignature) -> Self { + Self::AtomicReadWrite(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn batch_semaphore_acq(sig: &'a ResourceSignature) -> Self { + Self::BatchSemaphoreAcq(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn batch_semaphore_rel(sig: &'a ResourceSignature) -> Self { + Self::BatchSemaphoreRel(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn barrier_wait(sig: &'a ResourceSignature) -> Self { + Self::BarrierWait(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn condvar_wait(sig: &'a ResourceSignature) -> Self { + Self::CondvarWait(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn condvar_notify() -> Self { + Self::CondvarNotify(Location::caller()) + } + + #[track_caller] + pub(crate) fn park() -> Self { + Self::Park(Location::caller()) + } + + #[track_caller] + pub(crate) fn unpark(sig: &'a TaskSignature) -> Self { + Self::Unpark(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn channel_send(sig: &'a ResourceSignature) -> Self { + Self::ChannelSend(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn channel_recv(sig: &'a ResourceSignature) -> Self { + Self::ChannelRecv(sig, Location::caller()) + } + + #[track_caller] + pub(crate) fn yield_now() -> Self { + Self::Yield(Location::caller()) + } + + #[track_caller] + pub(crate) fn sleep() -> Self { + Self::Sleep(Location::caller()) + } + + #[track_caller] + pub(crate) fn join(sig: &'a TaskSignature) -> Self { + Self::Join(sig, Location::caller()) + } +} + /// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within /// the execution, and a `state` reflecting whether the task is runnable (enabled) or not. #[derive(Debug)] @@ -276,6 +402,8 @@ pub struct Task { // Remember whether the waker was invoked while we were running woken: bool, + next_event: Event<'static>, + name: Option, local_storage: StorageMap, @@ -351,6 +479,7 @@ impl Task { waiter: None, waker, woken: false, + next_event: Event::Unknown, detached: false, park_state: ParkState::default(), name, @@ -425,7 +554,7 @@ impl Task { let cx = &mut Context::from_waker(&waker); while future.as_mut().poll(cx).is_pending() { ExecutionState::with(|state| state.current_mut().sleep_unless_woken()); - thread::switch(); + thread::switch_keeping_current_event(); } }), stack_size, @@ -679,6 +808,27 @@ impl Task { } ) } + + /// Get the next_event with a downcast lifetime tied to self. This prevents the caller from + /// borrowing as `'static`, which is only used to avoid borrow-checker issues and spurious + /// lifetime annotations on the Task struct + /// SAFETY: The borrowed lifetime should not extend past resuming the task after a switch + pub(crate) fn next_event(&self) -> &Event<'_> { + unsafe { std::mem::transmute(&self.next_event) } + } + + /// Transmutes an Event reference to a `'static` lifetime to avoid borrow checker issues when + /// switching coroutines + /// SAFETY: For this to be safe, the next event must be unset with `unset_next_event` before + /// the actual lifetime of the event expires. In general, as long as the event is unset when + /// the task is resumed, this will be safe. + pub(crate) unsafe fn set_next_event(&mut self, event: Event<'_>) { + self.next_event = unsafe { std::mem::transmute::, Event<'static>>(event) }; + } + + pub(crate) fn unset_next_event(&mut self) { + self.next_event = Event::Unknown; + } } #[derive(PartialEq, Eq, Clone, Copy, Debug)] diff --git a/shuttle/src/runtime/thread/continuation.rs b/shuttle/src/runtime/thread/continuation.rs index ac19a767..f45f9f2e 100644 --- a/shuttle/src/runtime/thread/continuation.rs +++ b/shuttle/src/runtime/thread/continuation.rs @@ -1,4 +1,5 @@ use crate::runtime::execution::ExecutionState; +use crate::runtime::task::Event; use corosensei::Yielder; use corosensei::{stack::DefaultStack, Coroutine, CoroutineResult}; use scoped_tls::scoped_thread_local; @@ -6,7 +7,6 @@ use std::cell::{Cell, RefCell}; use std::collections::VecDeque; use std::ops::Deref; use std::ops::DerefMut; -use std::panic::Location; use std::rc::Rc; use tracing::trace; @@ -164,9 +164,7 @@ impl Continuation { /// have Exited, are not reusable as they have broken out of the loop where their inner functions /// can be replaced. fn reusable(&self) -> bool { - self.state == ContinuationState::NotReady - || self.state == ContinuationState::FinishedIteration - || self.state == ContinuationState::Initialized + self.state == ContinuationState::NotReady || self.state == ContinuationState::FinishedIteration } } @@ -313,9 +311,29 @@ unsafe impl Send for PooledContinuation {} /// operation `Y1`, it suffices to check that `Y1` commutes with all operations `Z` on the same resource, /// as operations on other resources should commute trivially. #[track_caller] -pub(crate) fn switch() { +pub(crate) fn switch(event: Event<'_>) { + // SAFETY we cast the lifetime of the Event to 'static when embedding it into the current Task + // This is safe because (1) we have a valid reference to the Event's data for the scope of this function + // and (2) the static reference is dropped at the end of the scope of this function when the next_event + // is set back to Unknown in `switch_keep_event` + ExecutionState::with(|s| s.try_current_mut().map(|c| unsafe { c.set_next_event(event) })); + switch_keeping_current_event() +} + +/// This function should be identical to `continuation::switch` except that the burden of setting the next event on the current +/// task before switching is on the *caller* of this function. In contrast, `continuation::switch` takes the next event as an +/// argument and sets it for you. This is useful for futures, which yield by returning Poll::Pending rather than calling `switch` +/// explicitly, and thus can't pass the `next_event` as an argument. Instead, Futures can set the next event on the current task +/// before returning `Poll::Pending`, and the Shuttle async runtime will call `switch_keep_current_event` to preserve that event. +/// The next event is always unset by this function when the caller resumes/continues from the switch. +#[track_caller] +pub(crate) fn switch_keeping_current_event() { crate::annotations::record_tick(); - trace!("switch from {}", Location::caller()); + + trace!( + "switch from {:?}", + ExecutionState::with(|s| s.try_current_mut().map(|c| format!("{}", c.next_event()))) + ); if ExecutionState::maybe_yield() { let yielder = ExecutionState::with(|state| state.current().yielder); @@ -327,6 +345,7 @@ pub(crate) fn switch() { ContinuationInput::Resume => {} }; } + ExecutionState::with(|s| s.try_current_mut().map(|c| c.unset_next_event())); } #[cfg(test)] diff --git a/shuttle/src/runtime/thread/mod.rs b/shuttle/src/runtime/thread/mod.rs index d8068657..cc2bb003 100644 --- a/shuttle/src/runtime/thread/mod.rs +++ b/shuttle/src/runtime/thread/mod.rs @@ -1,3 +1,3 @@ pub(crate) mod continuation; - pub(crate) use continuation::switch; +pub(crate) use continuation::switch_keeping_current_event; diff --git a/shuttle/src/scheduler/annotation.rs b/shuttle/src/scheduler/annotation.rs index 162fec06..94edc6f8 100644 --- a/shuttle/src/scheduler/annotation.rs +++ b/shuttle/src/scheduler/annotation.rs @@ -23,14 +23,14 @@ impl Scheduler for AnnotationScheduler { self.0.new_execution() } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option { + ) -> Option<&'a Task> { let choice = self.0.next_task(runnable_tasks, current_task, is_yielding)?; - record_schedule(choice, runnable_tasks); + record_schedule(choice.id(), runnable_tasks); Some(choice) } diff --git a/shuttle/src/scheduler/dfs.rs b/shuttle/src/scheduler/dfs.rs index c07d1946..34037dfd 100644 --- a/shuttle/src/scheduler/dfs.rs +++ b/shuttle/src/scheduler/dfs.rs @@ -12,8 +12,8 @@ pub struct DfsScheduler { allow_random_data: bool, iterations: usize, - // Vec<(previous choice, was that the last choice at that level)> - levels: Vec<(TaskId, bool)>, + // Vec<(previous choice index, was that the last choice at that level)> + levels: Vec<(usize, bool)>, steps: usize, data_source: FixedDataSource, @@ -66,35 +66,39 @@ impl Scheduler for DfsScheduler { // TODO should we respect `is_yielding` by not allowing `current` to be scheduled next? That // TODO would be unsound but perhaps useful for validating some code - fn next_task(&mut self, runnable: &[&Task], _current: Option, _is_yielding: bool) -> Option { - let next = if self.steps >= self.levels.len() { + fn next_task<'a>( + &mut self, + runnable: &'a [&'a Task], + _current: Option, + _is_yielding: bool, + ) -> Option<&'a Task> { + let next_idx = if self.steps >= self.levels.len() { // First time we've reached this level assert_eq!(self.steps, self.levels.len()); - let to_run = runnable.first().unwrap().id(); - self.levels.push((to_run, runnable.len() == 1)); - to_run + let idx = 0; + self.levels.push((idx, runnable.len() == 1)); + idx } else { - let (last_choice, was_last) = self.levels[self.steps]; + let (last_choice_idx, was_last) = self.levels[self.steps]; if self.has_more_choices(self.steps + 1) { // Keep the same choice, because there's more work to do somewhere below us - last_choice + last_choice_idx } else { // Time to make a change at this level assert!( !was_last, "if we are making a change, there should be another available option" ); - let next_idx = runnable.iter().position(|t| t.id() == last_choice).unwrap() + 1; - let next = runnable[next_idx].id(); + let next_idx = last_choice_idx + 1; self.levels.drain(self.steps..); - self.levels.push((next, next_idx == runnable.len() - 1)); - next + self.levels.push((next_idx, next_idx == runnable.len() - 1)); + next_idx } }; self.steps += 1; - Some(next) + Some(runnable[next_idx]) } fn next_u64(&mut self) -> u64 { diff --git a/shuttle/src/scheduler/metrics.rs b/shuttle/src/scheduler/metrics.rs index 1ef1b160..3f0a3a87 100644 --- a/shuttle/src/scheduler/metrics.rs +++ b/shuttle/src/scheduler/metrics.rs @@ -24,6 +24,43 @@ pub(crate) struct MetricsScheduler { random_choices: usize, random_choices_metric: CountSummaryMetric, + + atomic_read: usize, + atomic_read_metric: CountSummaryMetric, + atomic_write: usize, + atomic_write_metric: CountSummaryMetric, + atomic_read_write: usize, + atomic_read_write_metric: CountSummaryMetric, + batch_semaphore_acq: usize, + batch_semaphore_acq_metric: CountSummaryMetric, + batch_semaphore_rel: usize, + batch_semaphore_rel_metric: CountSummaryMetric, + barrier_wait: usize, + barrier_wait_metric: CountSummaryMetric, + condvar_wait: usize, + condvar_wait_metric: CountSummaryMetric, + condvar_notify: usize, + condvar_notify_metric: CountSummaryMetric, + park: usize, + park_metric: CountSummaryMetric, + unpark: usize, + unpark_metric: CountSummaryMetric, + channel_send: usize, + channel_send_metric: CountSummaryMetric, + channel_recv: usize, + channel_recv_metric: CountSummaryMetric, + spawn: usize, + spawn_metric: CountSummaryMetric, + yield_event: usize, + yield_event_metric: CountSummaryMetric, + sleep: usize, + sleep_metric: CountSummaryMetric, + exit: usize, + exit_metric: CountSummaryMetric, + join: usize, + join_metric: CountSummaryMetric, + unknown: usize, + unknown_metric: CountSummaryMetric, } impl MetricsScheduler { @@ -46,6 +83,43 @@ impl MetricsScheduler { random_choices: 0, random_choices_metric: CountSummaryMetric::new(), + + atomic_read: 0, + atomic_read_metric: CountSummaryMetric::new(), + atomic_write: 0, + atomic_write_metric: CountSummaryMetric::new(), + atomic_read_write: 0, + atomic_read_write_metric: CountSummaryMetric::new(), + batch_semaphore_acq: 0, + batch_semaphore_acq_metric: CountSummaryMetric::new(), + batch_semaphore_rel: 0, + batch_semaphore_rel_metric: CountSummaryMetric::new(), + barrier_wait: 0, + barrier_wait_metric: CountSummaryMetric::new(), + condvar_wait: 0, + condvar_wait_metric: CountSummaryMetric::new(), + condvar_notify: 0, + condvar_notify_metric: CountSummaryMetric::new(), + park: 0, + park_metric: CountSummaryMetric::new(), + unpark: 0, + unpark_metric: CountSummaryMetric::new(), + channel_send: 0, + channel_send_metric: CountSummaryMetric::new(), + channel_recv: 0, + channel_recv_metric: CountSummaryMetric::new(), + spawn: 0, + spawn_metric: CountSummaryMetric::new(), + yield_event: 0, + yield_event_metric: CountSummaryMetric::new(), + sleep: 0, + sleep_metric: CountSummaryMetric::new(), + exit: 0, + exit_metric: CountSummaryMetric::new(), + join: 0, + join_metric: CountSummaryMetric::new(), + unknown: 0, + unknown_metric: CountSummaryMetric::new(), } } } @@ -62,6 +136,44 @@ impl MetricsScheduler { self.random_choices_metric.record(self.random_choices); self.random_choices = 0; + + // Record and reset event counters + self.atomic_read_metric.record(self.atomic_read); + self.atomic_read = 0; + self.atomic_write_metric.record(self.atomic_write); + self.atomic_write = 0; + self.atomic_read_write_metric.record(self.atomic_read_write); + self.atomic_read_write = 0; + self.batch_semaphore_acq_metric.record(self.batch_semaphore_acq); + self.batch_semaphore_acq = 0; + self.batch_semaphore_rel_metric.record(self.batch_semaphore_rel); + self.batch_semaphore_rel = 0; + self.barrier_wait_metric.record(self.barrier_wait); + self.barrier_wait = 0; + self.condvar_wait_metric.record(self.condvar_wait); + self.condvar_wait = 0; + self.condvar_notify_metric.record(self.condvar_notify); + self.condvar_notify = 0; + self.park_metric.record(self.park); + self.park = 0; + self.unpark_metric.record(self.unpark); + self.unpark = 0; + self.channel_send_metric.record(self.channel_send); + self.channel_send = 0; + self.channel_recv_metric.record(self.channel_recv); + self.channel_recv = 0; + self.spawn_metric.record(self.spawn); + self.spawn = 0; + self.yield_event_metric.record(self.yield_event); + self.yield_event = 0; + self.sleep_metric.record(self.sleep); + self.sleep = 0; + self.exit_metric.record(self.exit); + self.exit = 0; + self.join_metric.record(self.join); + self.join = 0; + self.unknown_metric.record(self.unknown); + self.unknown = 0; } } @@ -83,24 +195,48 @@ impl Scheduler for MetricsScheduler { self.inner.new_execution() } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option { - let choice = self.inner.next_task(runnable_tasks, current_task, is_yielding)?; + ) -> Option<&'a Task> { + let chosen_task = self.inner.next_task(runnable_tasks, current_task, is_yielding)?; + + // Update event counters based on the chosen task's next_event + use crate::runtime::task::Event; + match chosen_task.next_event() { + Event::AtomicRead(_, _) => self.atomic_read += 1, + Event::AtomicWrite(_, _) => self.atomic_write += 1, + Event::AtomicReadWrite(_, _) => self.atomic_read_write += 1, + Event::BatchSemaphoreAcq(_, _) => self.batch_semaphore_acq += 1, + Event::BatchSemaphoreRel(_, _) => self.batch_semaphore_rel += 1, + Event::BarrierWait(_, _) => self.barrier_wait += 1, + Event::CondvarWait(_, _) => self.condvar_wait += 1, + Event::CondvarNotify(_) => self.condvar_notify += 1, + Event::Park(_) => self.park += 1, + Event::Unpark(_, _) => self.unpark += 1, + Event::ChannelSend(_, _) => self.channel_send += 1, + Event::ChannelRecv(_, _) => self.channel_recv += 1, + Event::Spawn(_) => self.spawn += 1, + Event::Yield(_) => self.yield_event += 1, + Event::Sleep(_) => self.sleep += 1, + Event::Exit => self.exit += 1, + Event::Join(_, _) => self.join += 1, + Event::Unknown => self.unknown += 1, + } self.steps += 1; - if choice != self.last_task { + let choice_id = chosen_task.id(); + if choice_id != self.last_task { self.context_switches += 1; if runnable_tasks.iter().any(|t| t.id() == self.last_task) { self.preemptions += 1; } } - self.last_task = choice; + self.last_task = choice_id; - Some(choice) + Some(chosen_task) } fn next_u64(&mut self) -> u64 { @@ -125,6 +261,24 @@ impl Drop for MetricsScheduler { context_switches = %self.context_switches_metric, preemptions = %self.preemptions_metric, random_choices = %self.random_choices_metric, + atomic_read = %self.atomic_read_metric, + atomic_write = %self.atomic_write_metric, + atomic_read_write = %self.atomic_read_write_metric, + batch_semaphore_acq = %self.batch_semaphore_acq_metric, + batch_semaphore_rel = %self.batch_semaphore_rel_metric, + barrier_wait = %self.barrier_wait_metric, + condvar_wait = %self.condvar_wait_metric, + condvar_notify = %self.condvar_notify_metric, + park = %self.park_metric, + unpark = %self.unpark_metric, + channel_send = %self.channel_send_metric, + channel_recv = %self.channel_recv_metric, + spawn = %self.spawn_metric, + yield_event = %self.yield_event_metric, + sleep = %self.sleep_metric, + exit = %self.exit_metric, + join = %self.join_metric, + unknown = %self.unknown_metric, "run finished" ); } diff --git a/shuttle/src/scheduler/mod.rs b/shuttle/src/scheduler/mod.rs index 581a659e..0ea83f94 100644 --- a/shuttle/src/scheduler/mod.rs +++ b/shuttle/src/scheduler/mod.rs @@ -103,12 +103,12 @@ pub trait Scheduler { /// /// The list of runnable tasks is guaranteed to be non-empty. If `current_task` is `None`, the /// execution has not yet begun. - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option; + ) -> Option<&'a Task>; /// Choose the next u64 value to return to the currently running task. fn next_u64(&mut self) -> u64; @@ -119,12 +119,12 @@ impl Scheduler for Box { self.as_mut().new_execution() } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option { + ) -> Option<&'a Task> { self.as_mut().next_task(runnable_tasks, current_task, is_yielding) } diff --git a/shuttle/src/scheduler/pct.rs b/shuttle/src/scheduler/pct.rs index 4f1176b9..9d9669e3 100644 --- a/shuttle/src/scheduler/pct.rs +++ b/shuttle/src/scheduler/pct.rs @@ -110,7 +110,12 @@ impl Scheduler for PctScheduler { Some(Schedule::new(self.data_source.reinitialize())) } - fn next_task(&mut self, runnable: &[&Task], current: Option, is_yielding: bool) -> Option { + fn next_task<'a>( + &mut self, + runnable: &'a [&'a Task], + current: Option, + is_yielding: bool, + ) -> Option<&'a Task> { // If any new tasks were created, assign them priorities by randomly swapping them with an // existing task's priority, so we maintain the invariant that every priority is distinct let max_known_task = self.priorities.len(); @@ -155,11 +160,10 @@ impl Scheduler for PctScheduler { // Choose the highest-priority (== lowest priority value) runnable task Some( - runnable + *runnable .iter() .min_by_key(|t| self.priorities.get(&t.id())) - .expect("priority queue invariant") - .id(), + .expect("priority queue invariant"), ) } diff --git a/shuttle/src/scheduler/random.rs b/shuttle/src/scheduler/random.rs index b1f40bf4..0724e359 100644 --- a/shuttle/src/scheduler/random.rs +++ b/shuttle/src/scheduler/random.rs @@ -88,8 +88,13 @@ impl Scheduler for RandomScheduler { } } - fn next_task(&mut self, runnable: &[&Task], _current: Option, _is_yielding: bool) -> Option { - Some(runnable.choose(&mut self.rng).unwrap().id()) + fn next_task<'a>( + &mut self, + runnable: &'a [&'a Task], + _current: Option, + _is_yielding: bool, + ) -> Option<&'a Task> { + Some(*runnable.choose(&mut self.rng).unwrap()) } fn next_u64(&mut self) -> u64 { diff --git a/shuttle/src/scheduler/replay.rs b/shuttle/src/scheduler/replay.rs index f97e91a7..acf79209 100644 --- a/shuttle/src/scheduler/replay.rs +++ b/shuttle/src/scheduler/replay.rs @@ -76,7 +76,12 @@ impl Scheduler for ReplayScheduler { } } - fn next_task(&mut self, runnable: &[&Task], _current: Option, _is_yielding: bool) -> Option { + fn next_task<'a>( + &mut self, + runnable: &'a [&'a Task], + _current: Option, + _is_yielding: bool, + ) -> Option<&'a Task> { loop { if self.steps >= self.schedule.steps.len() { assert!(self.allow_incomplete, "schedule ended early"); @@ -93,7 +98,7 @@ impl Scheduler for ReplayScheduler { if task.clock <= *target_clock { // The target event causally depends on this // event, so we schedule it. - return Some(next); + return Some(*task); } else { // The target event is concurrent with this // event, so it is irrelevant to the replay. @@ -114,7 +119,7 @@ impl Scheduler for ReplayScheduler { continue; } } else { - return Some(next); + return Some(*task); } } else { assert!( diff --git a/shuttle/src/scheduler/round_robin.rs b/shuttle/src/scheduler/round_robin.rs index 86f5635c..8cd32052 100644 --- a/shuttle/src/scheduler/round_robin.rs +++ b/shuttle/src/scheduler/round_robin.rs @@ -33,9 +33,14 @@ impl Scheduler for RoundRobinScheduler { } } - fn next_task(&mut self, runnable: &[&Task], current: Option, _is_yielding: bool) -> Option { + fn next_task<'a>( + &mut self, + runnable: &'a [&'a Task], + current: Option, + _is_yielding: bool, + ) -> Option<&'a Task> { if current.is_none() { - return Some(runnable.first().unwrap().id()); + return Some(runnable.first().unwrap()); } let current = current.unwrap(); @@ -43,8 +48,7 @@ impl Scheduler for RoundRobinScheduler { runnable .iter() .find(|t| t.id() > current) - .unwrap_or_else(|| runnable.first().unwrap()) - .id(), + .unwrap_or_else(|| runnable.first().unwrap()), ) } diff --git a/shuttle/src/scheduler/uncontrolled_nondeterminism.rs b/shuttle/src/scheduler/uncontrolled_nondeterminism.rs index e74d85af..03b9eef7 100644 --- a/shuttle/src/scheduler/uncontrolled_nondeterminism.rs +++ b/shuttle/src/scheduler/uncontrolled_nondeterminism.rs @@ -58,12 +58,12 @@ impl Scheduler for UncontrolledNondeterminismCheckScheduler { out } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option { + ) -> Option<&'a Task> { if self.recording { let choice = self.scheduler.next_task(runnable_tasks, current_task, is_yielding); let runnable_ids = runnable_tasks @@ -71,7 +71,7 @@ impl Scheduler for UncontrolledNondeterminismCheckScheduler { .map(|t| t.id()) .collect::>(); self.previous_schedule - .push(ScheduleRecord::Task(choice, runnable_ids, is_yielding)); + .push(ScheduleRecord::Task(choice.map(|t| t.id()), runnable_ids, is_yielding)); choice } else { @@ -99,7 +99,7 @@ impl Scheduler for UncontrolledNondeterminismCheckScheduler { self.current_step += 1; - *maybe_id + maybe_id.and_then(|id| runnable_tasks.iter().find(|t| t.id() == id).copied()) } ScheduleRecord::Random(_) => { panic!("possible nondeterminism: next step was context switch, but recording expected random number generation") diff --git a/shuttle/src/scheduler/urw.rs b/shuttle/src/scheduler/urw.rs index e74f4059..3e7ab147 100644 --- a/shuttle/src/scheduler/urw.rs +++ b/shuttle/src/scheduler/urw.rs @@ -123,7 +123,7 @@ impl UrwRandomScheduler { self.state = UrwSchedulerState::Initialized; } - fn next_task_urw(&mut self, runnable: &[&Task]) -> Option { + fn next_task_urw<'a>(&mut self, runnable: &[&'a Task]) -> Option<&'a Task> { let task_event_counts = self.task_event_counts.as_mut().unwrap(); // We need to loop over the tasks to identify if there has been a `spawn` event @@ -157,15 +157,14 @@ impl UrwRandomScheduler { assert!(task_event_counts[tid] >= 1); } - let next_tid = runnable + let next_t = *runnable .choose_weighted(&mut self.rng, |t| task_event_counts[get_tid(t)]) - .unwrap() - .id(); - let next_tid_usize: usize = next_tid.into(); + .unwrap(); + let next_tid_usize: usize = next_t.id().into(); task_event_counts[next_tid_usize] = task_event_counts[next_tid_usize].saturating_sub(1).max(1); trace!("URW remaining event counts: {:?}", task_event_counts); - Some(next_tid) + Some(next_t) } } @@ -192,12 +191,17 @@ impl Scheduler for UrwRandomScheduler { Some(Schedule::new(seed)) } - fn next_task(&mut self, runnable: &[&Task], _current: Option, _is_yielding: bool) -> Option { + fn next_task<'a>( + &mut self, + runnable: &'a [&'a Task], + _current: Option, + _is_yielding: bool, + ) -> Option<&'a Task> { match self.state { UrwSchedulerState::PreEstimation => unreachable!(), UrwSchedulerState::Estimating => { // Delegate scheduling to vanilla RW when estimating counts - let t = runnable.choose(&mut self.rng).unwrap(); + let t = *runnable.choose(&mut self.rng).unwrap(); // If we don't have event counts yet, use the current run to estimate event counts (1-shot) self.signature_event_counts @@ -212,7 +216,7 @@ impl Scheduler for UrwRandomScheduler { 1 }); - Some(t.id()) + Some(t) } UrwSchedulerState::Initialized => self.next_task_urw(runnable), } diff --git a/shuttle/src/sync/atomic/bool.rs b/shuttle/src/sync/atomic/bool.rs index deb3be86..e55da4c2 100644 --- a/shuttle/src/sync/atomic/bool.rs +++ b/shuttle/src/sync/atomic/bool.rs @@ -44,16 +44,19 @@ impl AtomicBool { } /// Loads a value from the atomic boolean. + #[track_caller] pub fn load(&self, order: Ordering) -> bool { self.inner.load(order) } /// Stores a value into the atomic boolean. + #[track_caller] pub fn store(&self, val: bool, order: Ordering) { self.inner.store(val, order) } /// Stores a value into the atomic boolean, returning the previous value. + #[track_caller] pub fn swap(&self, val: bool, order: Ordering) -> bool { self.inner.swap(val, order) } @@ -61,6 +64,7 @@ impl AtomicBool { /// Fetches the value, and applies a function to it that returns an optional new value. /// Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else /// `Err(previous_value)`. + #[track_caller] pub fn fetch_update(&self, set_order: Ordering, fetch_order: Ordering, f: F) -> Result where F: FnMut(bool) -> Option, @@ -71,6 +75,7 @@ impl AtomicBool { /// Stores a value into the atomic boolean if the current value is the same as the /// `current` value. #[deprecated(since = "0.0.6", note = "Use `compare_exchange` or `compare_exchange_weak` instead")] + #[track_caller] pub fn compare_and_swap(&self, current: bool, new: bool, order: Ordering) -> bool { match self.compare_exchange(current, new, order, order) { Ok(v) => v, @@ -84,6 +89,7 @@ impl AtomicBool { /// The return value is a result indicating whether the new value was written and /// containing the previous value. On success this value is guaranteed to be equal to /// `current`. + #[track_caller] pub fn compare_exchange( &self, current: bool, @@ -102,6 +108,7 @@ impl AtomicBool { /// platforms. The return value is a result indicating whether the new value was written /// and containing the previous value. // TODO actually produce spurious failures + #[track_caller] pub fn compare_exchange_weak( &self, current: bool, @@ -113,21 +120,25 @@ impl AtomicBool { } /// Logical "and" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_and(&self, val: bool, order: Ordering) -> bool { self.fetch_update(order, order, |old| Some(old & val)).unwrap() } /// Logical "nand" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_nand(&self, val: bool, order: Ordering) -> bool { self.fetch_update(order, order, |old| Some(!(old & val))).unwrap() } /// Logical "or" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_or(&self, val: bool, order: Ordering) -> bool { self.fetch_update(order, order, |old| Some(old | val)).unwrap() } /// Logical "xor" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_xor(&self, val: bool, order: Ordering) -> bool { self.fetch_update(order, order, |old| Some(old ^ val)).unwrap() } diff --git a/shuttle/src/sync/atomic/int.rs b/shuttle/src/sync/atomic/int.rs index b596d928..b9355605 100644 --- a/shuttle/src/sync/atomic/int.rs +++ b/shuttle/src/sync/atomic/int.rs @@ -46,16 +46,19 @@ macro_rules! atomic_int { } /// Loads a value from the atomic integer. + #[track_caller] pub fn load(&self, order: Ordering) -> $int_type { self.inner.load(order) } /// Stores a value into the atomic integer. + #[track_caller] pub fn store(&self, val: $int_type, order: Ordering) { self.inner.store(val, order) } /// Stores a value into the atomic integer, returning the previous value. + #[track_caller] pub fn swap(&self, val: $int_type, order: Ordering) -> $int_type { self.inner.swap(val, order) } @@ -63,6 +66,7 @@ macro_rules! atomic_int { /// Fetches the value, and applies a function to it that returns an optional new value. /// Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else /// `Err(previous_value)`. + #[track_caller] pub fn fetch_update( &self, set_order: Ordering, @@ -81,6 +85,7 @@ macro_rules! atomic_int { since = "0.0.6", note = "Use `compare_exchange` or `compare_exchange_weak` instead" )] + #[track_caller] pub fn compare_and_swap(&self, current: $int_type, new: $int_type, order: Ordering) -> $int_type { match self.compare_exchange(current, new, order, order) { Ok(v) => v, @@ -94,6 +99,7 @@ macro_rules! atomic_int { /// The return value is a result indicating whether the new value was written and /// containing the previous value. On success this value is guaranteed to be equal to /// `current`. + #[track_caller] pub fn compare_exchange( &self, current: $int_type, @@ -112,6 +118,7 @@ macro_rules! atomic_int { /// The return value is a result indicating whether the new value was written and /// containing the previous value. // TODO actually produce spurious failures + #[track_caller] pub fn compare_exchange_weak( &self, current: $int_type, @@ -125,6 +132,7 @@ macro_rules! atomic_int { /// Adds to the current value, returning the previous value. /// /// This operation wraps around on overflow. + #[track_caller] pub fn fetch_add(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old.wrapping_add(val))) .unwrap() @@ -133,37 +141,44 @@ macro_rules! atomic_int { /// Subtracts from the current value, returning the previous value. /// /// This operation wraps around on overflow. + #[track_caller] pub fn fetch_sub(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old.wrapping_sub(val))) .unwrap() } /// Bitwise "and" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_and(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old & val)).unwrap() } /// Bitwise "nand" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_nand(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(!(old & val))).unwrap() } /// Bitwise "or" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_or(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old | val)).unwrap() } /// Bitwise "xor" with the current value. Returns the previous value. + #[track_caller] pub fn fetch_xor(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old ^ val)).unwrap() } /// Maximum with the current value. Returns the previous value. + #[track_caller] pub fn fetch_max(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old.max(val))).unwrap() } /// Minimum with the current value. Returns the previous value. + #[track_caller] pub fn fetch_min(&self, val: $int_type, order: Ordering) -> $int_type { self.fetch_update(order, order, |old| Some(old.min(val))).unwrap() } diff --git a/shuttle/src/sync/atomic/mod.rs b/shuttle/src/sync/atomic/mod.rs index 4792c9ba..c6e30f63 100644 --- a/shuttle/src/sync/atomic/mod.rs +++ b/shuttle/src/sync/atomic/mod.rs @@ -61,6 +61,7 @@ pub use std::sync::atomic::Ordering; use crate::runtime::execution::ExecutionState; use crate::runtime::task::clock::VectorClock; +use crate::runtime::task::Event; use crate::runtime::thread; use crate::silence_warnings; use crate::sync::{ResourceSignature, ResourceType}; @@ -121,7 +122,6 @@ pub use std::sync::atomic::compiler_fence; struct Atomic { inner: RefCell, clock: RefCell>, // wrapped in option to support the const new() - #[allow(unused)] signature: ResourceSignature, } @@ -159,34 +159,38 @@ impl Atomic { self.inner.into_inner() } + #[track_caller] fn load(&self, order: Ordering) -> T { maybe_warn_about_ordering(order); - thread::switch(); + thread::switch(Event::atomic_read(&self.signature)); self.exhale_clock(); let value = *self.inner.borrow(); value } + #[track_caller] fn store(&self, val: T, order: Ordering) { maybe_warn_about_ordering(order); - thread::switch(); + thread::switch(Event::atomic_write(&self.signature)); self.inhale_clock(); *self.inner.borrow_mut() = val; } + #[track_caller] fn swap(&self, mut val: T, order: Ordering) -> T { maybe_warn_about_ordering(order); // swap behaves like { let x = load() ; store(val) ; x } - thread::switch(); + thread::switch(Event::atomic_read_write(&self.signature)); self.exhale_clock(); // for the load self.inhale_clock(); // for the store std::mem::swap(&mut *self.inner.borrow_mut(), &mut val); val } + #[track_caller] fn fetch_update(&self, set_order: Ordering, fetch_order: Ordering, mut f: F) -> Result where F: FnMut(T) -> Option, @@ -196,7 +200,7 @@ impl Atomic { // fetch_update behaves like (ignoring error): { let x = load() ; store(f(x)); x } // in the error case, there is no store, so the register does not inherit the clock of the caller - thread::switch(); + thread::switch(Event::atomic_read_write(&self.signature)); self.exhale_clock(); // for the load() let current = *self.inner.borrow(); let ret = if let Some(new) = f(current) { diff --git a/shuttle/src/sync/atomic/ptr.rs b/shuttle/src/sync/atomic/ptr.rs index 8df4d31c..733921ec 100644 --- a/shuttle/src/sync/atomic/ptr.rs +++ b/shuttle/src/sync/atomic/ptr.rs @@ -49,16 +49,19 @@ impl AtomicPtr { } /// Loads a value from the pointer. + #[track_caller] pub fn load(&self, order: Ordering) -> *mut T { self.inner.load(order) } /// Stores a value into the pointer. + #[track_caller] pub fn store(&self, val: *mut T, order: Ordering) { self.inner.store(val, order) } /// Stores a value into the atomic pointer, returning the previous value. + #[track_caller] pub fn swap(&self, val: *mut T, order: Ordering) -> *mut T { self.inner.swap(val, order) } @@ -66,6 +69,7 @@ impl AtomicPtr { /// Fetches the value, and applies a function to it that returns an optional new value. /// Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else /// `Err(previous_value)`. + #[track_caller] pub fn fetch_update(&self, set_order: Ordering, fetch_order: Ordering, f: F) -> Result<*mut T, *mut T> where F: FnMut(*mut T) -> Option<*mut T>, @@ -76,6 +80,7 @@ impl AtomicPtr { /// Stores a value into the atomic pointer if the current value is the same as the /// `current` value. #[deprecated(since = "0.0.6", note = "Use `compare_exchange` or `compare_exchange_weak` instead")] + #[track_caller] pub fn compare_and_swap(&self, current: *mut T, new: *mut T, order: Ordering) -> *mut T { match self.compare_exchange(current, new, order, order) { Ok(v) => v, @@ -89,6 +94,7 @@ impl AtomicPtr { /// The return value is a result indicating whether the new value was written and /// containing the previous value. On success this value is guaranteed to be equal to /// `current`. + #[track_caller] pub fn compare_exchange( &self, current: *mut T, @@ -107,6 +113,7 @@ impl AtomicPtr { /// platforms. The return value is a result indicating whether the new value was written /// and containing the previous value. // TODO actually produce spurious failures + #[track_caller] pub fn compare_exchange_weak( &self, current: *mut T, diff --git a/shuttle/src/sync/barrier.rs b/shuttle/src/sync/barrier.rs index 3f688381..8783a0aa 100644 --- a/shuttle/src/sync/barrier.rs +++ b/shuttle/src/sync/barrier.rs @@ -1,6 +1,6 @@ use crate::runtime::execution::ExecutionState; use crate::runtime::task::clock::VectorClock; -use crate::runtime::task::TaskId; +use crate::runtime::task::{Event, TaskId}; use crate::runtime::thread; use crate::sync::{ResourceSignature, ResourceType}; use std::cell::RefCell; @@ -72,7 +72,6 @@ impl fmt::Debug for BarrierState { /// A barrier enables multiple threads to synchronize the beginning of some computation. pub struct Barrier { state: Rc>, - #[allow(unused)] signature: ResourceSignature, } @@ -97,6 +96,7 @@ impl Barrier { } /// Blocks the current thread until all threads have rendezvoused here. + #[track_caller] pub fn wait(&self) -> BarrierWaitResult { let state = self.state.borrow_mut(); // The barrier will block if the number of current waiters *plus* an additional waiter @@ -114,7 +114,7 @@ impl Barrier { // `Y1 Z` and `Z Y1`, the state of the barrier is {T1, T2}. As a result, we never need to // switch before blocking on a barrier wait. if !will_block { - thread::switch(); + thread::switch(Event::barrier_wait(&self.signature)); } let mut state = self.state.borrow_mut(); let my_epoch = state.epoch; @@ -134,7 +134,7 @@ impl Barrier { trace!(waiters=?state.waiters, epoch=my_epoch, "blocked on barrier {:?}", self); drop(state); ExecutionState::with(|s| s.current_mut().block(false)); - thread::switch(); + thread::switch(Event::barrier_wait(&self.signature)); } else { trace!(waiters=?state.waiters, epoch=my_epoch, "releasing waiters on barrier {:?}", self); diff --git a/shuttle/src/sync/condvar.rs b/shuttle/src/sync/condvar.rs index 813adccb..923a2afd 100644 --- a/shuttle/src/sync/condvar.rs +++ b/shuttle/src/sync/condvar.rs @@ -1,7 +1,7 @@ use crate::current; use crate::runtime::execution::ExecutionState; use crate::runtime::task::clock::VectorClock; -use crate::runtime::task::TaskId; +use crate::runtime::task::{Event, TaskId}; use crate::runtime::thread; use crate::sync::{MutexGuard, ResourceSignature, ResourceType}; use assoc::AssocExt; @@ -16,7 +16,6 @@ use tracing::trace; #[derive(Debug)] pub struct Condvar { state: RefCell, - #[allow(unused)] signature: ResourceSignature, } @@ -131,6 +130,7 @@ impl Condvar { } /// Blocks the current thread until this condition variable receives a notification. + #[track_caller] pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> LockResult> { let me = ExecutionState::me(); @@ -152,7 +152,7 @@ impl Condvar { // TODO: Condvar::wait should allow for spurious wakeups. ExecutionState::with(|s| s.current_mut().block(false)); - thread::switch(); + thread::switch(Event::condvar_wait(&self.signature)); // After the context switch, consume whichever signal that woke this thread let mut state = self.state.borrow_mut(); @@ -240,8 +240,9 @@ impl Condvar { /// /// If there is a blocked thread on this condition variable, then it will be woken up from its /// call to wait or wait_timeout. Calls to notify_one are not buffered in any way. + #[track_caller] pub fn notify_one(&self) { - thread::switch(); + thread::switch(Event::condvar_notify()); let me = ExecutionState::me(); @@ -277,8 +278,9 @@ impl Condvar { } /// Wakes up all blocked threads on this condvar. + #[track_caller] pub fn notify_all(&self) { - thread::switch(); + thread::switch(Event::condvar_notify()); let me = ExecutionState::me(); diff --git a/shuttle/src/sync/mpsc.rs b/shuttle/src/sync/mpsc.rs index 82bd7cbc..646d6d57 100644 --- a/shuttle/src/sync/mpsc.rs +++ b/shuttle/src/sync/mpsc.rs @@ -2,7 +2,7 @@ use crate::runtime::execution::ExecutionState; use crate::runtime::task::clock::VectorClock; -use crate::runtime::task::{TaskId, DEFAULT_INLINE_TASKS}; +use crate::runtime::task::{Event, TaskId, DEFAULT_INLINE_TASKS}; use crate::runtime::thread; use crate::sync::{ResourceSignature, ResourceType}; use smallvec::SmallVec; @@ -47,7 +47,6 @@ pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { struct Channel { bound: Option, // None for an unbounded channel, Some(k) for a bounded channel of size k state: Rc>>, - #[allow(unused)] signature: ResourceSignature, } @@ -134,10 +133,12 @@ impl Channel { } } + #[track_caller] fn try_send(&self, message: T) -> Result<(), TrySendError> { self.send_internal(message, false) } + #[track_caller] fn send(&self, message: T) -> Result<(), SendError> { self.send_internal(message, true).map_err(|e| match e { TrySendError::Full(_) => unreachable!(), @@ -166,10 +167,11 @@ impl Channel { is_full || !state.waiting_senders.is_empty() || (is_rendezvous && state.waiting_receivers.is_empty()) } + #[track_caller] fn send_internal(&self, message: T, can_block: bool) -> Result<(), TrySendError> { // Because channels are always fair wrt. waiting senders (waiting senders is an *ordered* list), // blocking sends do not commute thus must always provide a switch before blocking - thread::switch(); + thread::switch(Event::channel_send(&self.signature)); let me = ExecutionState::me(); let mut state = self.state.borrow_mut(); @@ -201,7 +203,7 @@ impl Channel { ExecutionState::with(|s| s.current_mut().block(false)); drop(state); - thread::switch(); + thread::switch(Event::channel_send(&self.signature)); state = self.state.borrow_mut(); trace!( @@ -259,6 +261,7 @@ impl Channel { Ok(()) } + #[track_caller] fn recv(&self) -> Result { self.recv_internal(true).map_err(|e| match e { TryRecvError::Disconnected => RecvError, @@ -266,6 +269,7 @@ impl Channel { }) } + #[track_caller] fn try_recv(&self) -> Result { self.recv_internal(false) } @@ -277,10 +281,11 @@ impl Channel { state.messages.is_empty() || !state.waiting_receivers.is_empty() } + #[track_caller] fn recv_internal(&self, can_block: bool) -> Result { // Because channels are always fair wrt. waiting receivers (waiting receivers is an *ordered* list), // blocking receives do not commute thus must always provide a switch before blocking - thread::switch(); + thread::switch(Event::channel_recv(&self.signature)); let me = ExecutionState::me(); let mut state = self.state.borrow_mut(); @@ -339,7 +344,7 @@ impl Channel { ExecutionState::with(|s| s.current_mut().block(false)); drop(state); - thread::switch(); + thread::switch(Event::channel_recv(&self.signature)); state = self.state.borrow_mut(); trace!( diff --git a/shuttle/src/sync/mutex.rs b/shuttle/src/sync/mutex.rs index 6174513b..ebefc7cc 100644 --- a/shuttle/src/sync/mutex.rs +++ b/shuttle/src/sync/mutex.rs @@ -1,6 +1,6 @@ use crate::current; use crate::future::batch_semaphore::{BatchSemaphore, Fairness}; -use crate::runtime::task::TaskId; +use crate::runtime::task::{Event, TaskId}; use crate::runtime::thread; use crate::sync::{LockResult, PoisonError, TryLockError, TryLockResult}; use crate::sync::{ResourceSignature, ResourceType}; @@ -69,7 +69,7 @@ impl Mutex { self.semaphore.acquire_blocking(1).unwrap(); } else { // we always need to allow for a context switch to make the previous event visible for completeness - thread::switch(); + thread::switch(Event::Unknown); } state = self.state.borrow_mut(); diff --git a/shuttle/src/sync/rwlock.rs b/shuttle/src/sync/rwlock.rs index d0b6c7ec..72c0e80d 100644 --- a/shuttle/src/sync/rwlock.rs +++ b/shuttle/src/sync/rwlock.rs @@ -1,6 +1,6 @@ use crate::future::batch_semaphore::{BatchSemaphore, Fairness}; use crate::runtime::execution::ExecutionState; -use crate::runtime::task::{TaskId, TaskSet}; +use crate::runtime::task::{Event, TaskId, TaskSet}; use crate::runtime::thread; use crate::sync::{ResourceSignature, ResourceType}; use std::cell::RefCell; @@ -218,7 +218,7 @@ impl RwLock { self.semaphore.acquire_blocking(typ.num_permits()).unwrap(); } else { // we always need to allow for a context switch to make the previous event visible for completeness - thread::switch(); + thread::switch(Event::Unknown); } state = self.state.borrow_mut(); diff --git a/shuttle/src/thread.rs b/shuttle/src/thread.rs index 5d945ac3..e597f24f 100644 --- a/shuttle/src/thread.rs +++ b/shuttle/src/thread.rs @@ -1,7 +1,7 @@ //! Shuttle's implementation of [`std::thread`]. use crate::runtime::execution::ExecutionState; -use crate::runtime::task::TaskId; +use crate::runtime::task::{Event, TaskId}; use crate::runtime::thread; use std::marker::PhantomData; use std::panic::Location; @@ -42,8 +42,10 @@ impl Thread { } /// Atomically makes the handle's token available if it is not already. + #[track_caller] pub fn unpark(&self) { - thread::switch(); + let target_task_signature = ExecutionState::with(|s| s.get(self.id.task_id).signature.clone()); + thread::switch(Event::unpark(&target_task_signature)); ExecutionState::with(|s| { s.get_mut(self.id.task_id).unpark(); @@ -91,7 +93,7 @@ impl<'scope> Scope<'scope, '_> { let ret = f(); if ExecutionState::with(|s| s.exit_current_truncates_execution()) { - thread::switch(); + thread::switch(Event::Exit); } finished.store(true, Ordering::Relaxed); @@ -121,6 +123,7 @@ impl<'scope> Scope<'scope, '_> { /// /// The function passed to `scope` will be provided a [`Scope`] object, /// through which scoped threads can be [spawned][`Scope::spawn`]. +#[track_caller] pub fn scope<'env, F, T>(f: F) -> T where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, @@ -137,7 +140,7 @@ where if scope.num_running_threads.load(Ordering::Relaxed) != 0 { tracing::info!("thread blocked, waiting for completion of scoped threads"); ExecutionState::with(|s| s.current_mut().block(false)); - thread::switch(); + thread::switch(Event::park()); } ret @@ -228,7 +231,7 @@ pub(crate) fn thread_fn( if switch_before_exit && ExecutionState::with(|s| s.exit_current_truncates_execution()) { // Exiting the last attached task can truncate the execution. To make the previous // event visible before truncation, we need a scheduling point before exiting. - thread::switch(); + thread::switch(Event::Exit); } tracing::trace!("thread finished, dropping thread locals"); @@ -299,11 +302,13 @@ unsafe impl Sync for JoinHandle {} impl JoinHandle { /// Waits for the associated thread to finish. + #[track_caller] pub fn join(self) -> Result { + let target_task_signature = ExecutionState::with(|s| s.get(self.task_id).signature.clone()); let is_finished = ExecutionState::with(|state| state.get(self.task_id).finished()); // If the joinee task is finished then the joiner will not block if is_finished { - thread::switch(); + thread::switch(Event::join(&target_task_signature)); } let should_block = ExecutionState::with(|state| { @@ -318,7 +323,7 @@ impl JoinHandle { }); if should_block { - thread::switch(); + thread::switch(Event::join(&target_task_signature)); } // Waiting thread inherits the clock of the finished thread @@ -341,17 +346,19 @@ impl JoinHandle { /// /// Some Shuttle schedulers use this as a hint to deprioritize the current thread in order for other /// threads to make progress (e.g., in a spin loop). +#[track_caller] pub fn yield_now() { let waker = ExecutionState::with(|state| state.current().waker()); waker.wake_by_ref(); ExecutionState::request_yield(); - thread::switch(); + thread::switch(Event::yield_now()); } /// Puts the current thread to sleep for at least the specified amount of time. // Note that Shuttle does not model time, so this behaves just like a context switch. +#[track_caller] pub fn sleep(_dur: Duration) { - thread::switch(); + thread::switch(Event::sleep()); } /// Get a handle to the thread that invokes it @@ -368,6 +375,7 @@ pub fn current() -> Thread { } /// Blocks unless or until the current thread's token is made available (may wake spuriously). +#[track_caller] pub fn park() { let switch = ExecutionState::with(|s| s.current_mut().park()); @@ -379,7 +387,7 @@ pub fn park() { // context would result in spurious wakeups triggering nearly every time. if switch { ExecutionState::request_yield(); - thread::switch(); + thread::switch(Event::park()); } } diff --git a/shuttle/tests/basic/execution.rs b/shuttle/tests/basic/execution.rs index 3713153e..9ee8e7e6 100644 --- a/shuttle/tests/basic/execution.rs +++ b/shuttle/tests/basic/execution.rs @@ -125,17 +125,17 @@ fn max_steps_early_exit_scheduler() { } } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], _current_task: Option, _is_yielding: bool, - ) -> Option { + ) -> Option<&'a Task> { if self.steps >= self.max_steps { None } else { self.steps += 1; - Some(runnable_tasks.first().unwrap().id()) + Some(runnable_tasks.first().unwrap()) } } diff --git a/shuttle/tests/basic/timeout.rs b/shuttle/tests/basic/timeout.rs index 00145002..278ac365 100644 --- a/shuttle/tests/basic/timeout.rs +++ b/shuttle/tests/basic/timeout.rs @@ -25,12 +25,12 @@ impl Scheduler for SleepableScheduler { self.scheduler.new_execution() } - fn next_task( + fn next_task<'a>( &mut self, - runnable_tasks: &[&Task], + runnable_tasks: &'a [&'a Task], current_task: Option, is_yielding: bool, - ) -> Option { + ) -> Option<&'a Task> { self.scheduler.next_task(runnable_tasks, current_task, is_yielding) }