From 5e16678f12a873714924909887132dff81803d3e Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Wed, 17 Oct 2018 01:12:20 -0600 Subject: [PATCH 01/10] Refactor Notifier out of future/shared So that we can also use it for stream/shared --- futures-util/src/future/future/shared.rs | 126 ++++------------------- futures-util/src/lib.rs | 2 + futures-util/src/wakerset.rs | 120 +++++++++++++++++++++ 3 files changed, 143 insertions(+), 105 deletions(-) create mode 100644 futures-util/src/wakerset.rs diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index f26b20d926..8f704d8223 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -1,4 +1,5 @@ -use crate::task::{waker_ref, ArcWake}; +use crate::task::waker_ref; +use crate::wakerset::{WakerKey, WakerSet}; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; use core::fmt; @@ -8,29 +9,19 @@ use core::ptr; use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering::{Acquire, SeqCst}; use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll, Waker}; -use slab::Slab; - -#[cfg(feature = "std")] -type Mutex = std::sync::Mutex; -#[cfg(not(feature = "std"))] -type Mutex = spin::Mutex; +use futures_core::task::{Context, Poll}; /// Future for the [`shared`](super::FutureExt::shared) method. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Shared { inner: Option>>, - waker_key: usize, + waker_key: WakerKey, } struct Inner { future_or_output: UnsafeCell>, - notifier: Arc, -} - -struct Notifier { state: AtomicUsize, - wakers: Mutex>>>, + notifier: Arc, } /// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`. @@ -87,19 +78,15 @@ const POLLING: usize = 1; const COMPLETE: usize = 2; const POISONED: usize = 3; -const NULL_WAKER_KEY: usize = usize::MAX; - impl Shared { pub(super) fn new(future: Fut) -> Self { let inner = Inner { future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)), - notifier: Arc::new(Notifier { - state: AtomicUsize::new(IDLE), - wakers: Mutex::new(Some(Slab::new())), - }), + state: AtomicUsize::new(IDLE), + notifier: Arc::new(WakerSet::new()), }; - Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY } + Self { inner: Some(Arc::new(inner)), waker_key: WakerKey::NULL } } } @@ -113,7 +100,7 @@ where /// [`poll`](Future::poll). pub fn peek(&self) -> Option<&Fut::Output> { if let Some(inner) = self.inner.as_ref() { - match inner.notifier.state.load(SeqCst) { + match inner.state.load(SeqCst) { COMPLETE => unsafe { return Some(inner.output()) }, POISONED => panic!("inner future panicked during poll"), _ => {} @@ -207,34 +194,6 @@ where Fut: Future, Fut::Output: Clone, { - /// Registers the current task to receive a wakeup when we are awoken. - fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { - #[cfg(feature = "std")] - let mut wakers_guard = self.notifier.wakers.lock().unwrap(); - #[cfg(not(feature = "std"))] - let mut wakers_guard = self.notifier.wakers.lock(); - - let wakers_mut = wakers_guard.as_mut(); - - let wakers = match wakers_mut { - Some(wakers) => wakers, - None => return, - }; - - let new_waker = cx.waker(); - - if *waker_key == NULL_WAKER_KEY { - *waker_key = wakers.insert(Some(new_waker.clone())); - } else { - match wakers[*waker_key] { - Some(ref old_waker) if new_waker.will_wake(old_waker) => {} - // Could use clone_from here, but Waker doesn't specialize it. - ref mut slot => *slot = Some(new_waker.clone()), - } - } - debug_assert!(*waker_key != NULL_WAKER_KEY); - } - /// Safety: callers must first ensure that `inner.state` /// is `COMPLETE` unsafe fn take_or_clone_output(self: Arc) -> Fut::Output { @@ -271,19 +230,14 @@ where let inner = this.inner.take().expect("Shared future polled again after completion"); // Fast path for when the wrapped future has already completed - if inner.notifier.state.load(Acquire) == COMPLETE { + if inner.state.load(Acquire) == COMPLETE { // Safety: We're in the COMPLETE state return unsafe { Poll::Ready(inner.take_or_clone_output()) }; } - inner.record_waker(&mut this.waker_key, cx); + inner.notifier.record_waker(&mut this.waker_key, cx); - match inner - .notifier - .state - .compare_exchange(IDLE, POLLING, SeqCst, SeqCst) - .unwrap_or_else(|x| x) - { + match inner.state.compare_exchange(IDLE, POLLING, SeqCst, SeqCst).unwrap_or_else(|x| x) { IDLE => { // Lock acquired, fall through } @@ -317,7 +271,7 @@ where } } - let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; + let mut reset = Reset { state: &inner.state, did_not_panic: false }; let output = { let future = unsafe { @@ -332,8 +286,7 @@ where match poll_result { Poll::Pending => { - if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() - { + if inner.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success drop(reset); this.inner = Some(inner); @@ -350,21 +303,12 @@ where *inner.future_or_output.get() = FutureOrOutput::Output(output); } - inner.notifier.state.store(COMPLETE, SeqCst); + inner.state.store(COMPLETE, SeqCst); - // Wake all tasks and drop the slab - #[cfg(feature = "std")] - let mut wakers_guard = inner.notifier.wakers.lock().unwrap(); - #[cfg(not(feature = "std"))] - let mut wakers_guard = inner.notifier.wakers.lock(); - - let mut wakers = wakers_guard.take().unwrap(); - for waker in wakers.drain().flatten() { - waker.wake(); - } + // Wake all tasks + inner.notifier.wake_and_finish(); drop(reset); // Make borrow checker happy - drop(wakers_guard); // Safety: We're in the COMPLETE state unsafe { Poll::Ready(inner.take_or_clone_output()) } @@ -376,7 +320,7 @@ where Fut: Future, { fn clone(&self) -> Self { - Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY } + Self { inner: self.inner.clone(), waker_key: WakerKey::NULL } } } @@ -385,36 +329,8 @@ where Fut: Future, { fn drop(&mut self) { - if self.waker_key != NULL_WAKER_KEY { - if let Some(ref inner) = self.inner { - #[cfg(feature = "std")] - if let Ok(mut wakers) = inner.notifier.wakers.lock() { - if let Some(wakers) = wakers.as_mut() { - wakers.remove(self.waker_key); - } - } - #[cfg(not(feature = "std"))] - if let Some(wakers) = inner.notifier.wakers.lock().as_mut() { - wakers.remove(self.waker_key); - } - } - } - } -} - -impl ArcWake for Notifier { - fn wake_by_ref(arc_self: &Arc) { - #[cfg(feature = "std")] - let wakers = &mut *arc_self.wakers.lock().unwrap(); - #[cfg(not(feature = "std"))] - let wakers = &mut *arc_self.wakers.lock(); - - if let Some(wakers) = wakers.as_mut() { - for (_key, opt_waker) in wakers { - if let Some(waker) = opt_waker.take() { - waker.wake(); - } - } + if let Some(ref inner) = self.inner { + inner.notifier.unregister(self.waker_key) } } } @@ -425,6 +341,6 @@ impl WeakShared { /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled /// to completion. pub fn upgrade(&self) -> Option> { - Some(Shared { inner: Some(self.0.upgrade()?), waker_key: NULL_WAKER_KEY }) + Some(Shared { inner: Some(self.0.upgrade()?), waker_key: WakerKey::NULL }) } } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 93d7c5e2ff..8652dde577 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -328,3 +328,5 @@ mod abortable; mod fns; mod unfold_state; +#[cfg(feature = "std")] +mod wakerset; diff --git a/futures-util/src/wakerset.rs b/futures-util/src/wakerset.rs new file mode 100644 index 0000000000..3813b8138e --- /dev/null +++ b/futures-util/src/wakerset.rs @@ -0,0 +1,120 @@ +use crate::task::{ArcWake, Waker}; +use alloc::sync::Arc; +use core::fmt; +use futures_core::task::Context; +use slab::Slab; + +#[cfg(feature = "std")] +type Mutex = std::sync::Mutex; +#[cfg(not(feature = "std"))] +type Mutex = spin::Mutex; + +/// Type for keys into the waker set +#[derive(Copy, Clone, PartialEq, Eq)] +pub(crate) struct WakerKey(usize); + +impl WakerKey { + /// null value for a waker key + pub(crate) const NULL: WakerKey = WakerKey(usize::MAX); +} + +impl fmt::Debug for WakerKey { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} + +/// An object that contains a set of wakers that can be notified +/// together. +/// +/// Useful for implementing shared futures and streams +pub(crate) struct WakerSet { + wakers: Mutex>>>, +} + +impl WakerSet { + pub(crate) fn new() -> Self { + Self { wakers: Mutex::new(Some(Slab::new())) } + } + + /// Registers the current task to receive a wakeup when we are awoken. + pub(crate) fn record_waker(&self, key: &mut WakerKey, cx: &mut Context<'_>) { + let mut wakers_guard = self.wakers(); + + let wakers = match wakers_guard.as_mut() { + Some(wakers) => wakers, + None => return, + }; + let new_waker = cx.waker(); + + if *key == WakerKey::NULL { + *key = WakerKey(wakers.insert(Some(new_waker.clone()))); + } else { + match wakers[key.0] { + Some(ref old_waker) if new_waker.will_wake(old_waker) => {} + // Could use clone_from here, but Waker doesn't specialize it. + ref mut slot => *slot = Some(new_waker.clone()), + } + } + debug_assert!(*key != WakerKey::NULL); + } + + pub(crate) fn unregister(&self, key: WakerKey) { + if key == WakerKey::NULL { + return; + } + #[cfg(feature = "std")] + if let Ok(mut wakers) = self.wakers.lock() { + if let Some(wakers) = wakers.as_mut() { + wakers.remove(key.0); + } + } + #[cfg(not(feature = "std"))] + if let Some(wakers) = self.wakers.lock().as_mut() { + wakers.remove(self.waker_key); + } + } + + /// Wake all registered wakers + pub(crate) fn wake_all(&self) { + let wakers = &mut *self.wakers(); + + if let Some(wakers) = wakers.as_mut() { + for (_key, opt_waker) in wakers { + if let Some(waker) = opt_waker.take() { + waker.wake() + } + } + } + } + + /// Wake all registered wakers and stop notifying + /// + /// After waking existing wakers, this drops the underlying buffer, and will + /// no longer notify more wakers. + pub(crate) fn wake_and_finish(&self) { + let mut wakers_guard = self.wakers(); + + if let Some(mut wakers) = wakers_guard.take() { + for waker in wakers.drain().flatten() { + waker.wake(); + } + } + } + + #[cfg(feature = "std")] + fn wakers(&self) -> std::sync::MutexGuard<'_, Option>>> { + self.wakers.lock().unwrap() + } + + #[cfg(not(feature = "std"))] + fn wakers(&self) -> spin::mutex::MutexGuard<'_, Option>>> { + self.wakers.lock() + } +} + +impl ArcWake for WakerSet { + fn wake_by_ref(arc_self: &Arc) { + arc_self.wake_all(); + } +} From c5663e33f12430822613422f333f18b615afebd2 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Thu, 8 Nov 2018 22:24:28 -0700 Subject: [PATCH 02/10] Implement a shared Stream --- futures-util/src/stream/mod.rs | 2 +- futures-util/src/stream/stream/mod.rs | 72 ++++ futures-util/src/stream/stream/shared.rs | 432 +++++++++++++++++++++++ futures/tests/stream_shared.rs | 88 +++++ 4 files changed, 593 insertions(+), 1 deletion(-) create mode 100644 futures-util/src/stream/stream/shared.rs create mode 100644 futures/tests/stream_shared.rs diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad221..8ed8e0129d 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -25,7 +25,7 @@ pub use self::stream::{ }; #[cfg(feature = "std")] -pub use self::stream::CatchUnwind; +pub use self::stream::{CatchUnwind, Shared}; #[cfg(feature = "alloc")] pub use self::stream::Chunks; diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6f..baf79c9145 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -218,6 +218,12 @@ mod catch_unwind; #[cfg(feature = "std")] pub use self::catch_unwind::CatchUnwind; +#[cfg(feature = "std")] +mod shared; +#[cfg(feature = "std")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::shared::Shared; + impl StreamExt for T where T: Stream {} /// An extension trait for `Stream`s that provides a variety of convenient @@ -1469,6 +1475,72 @@ pub trait StreamExt: Stream { assert_stream::(Box::pin(self)) } + /// Create a cloneable handle to this stream where all handles will resolve + /// to the same result. + /// + /// The shared() method provides a method to convert any stream into a + /// cloneable stream. It enables a stream to be polled by multiple threads. + /// + /// This method is only available when the `std` feature of this library is + /// activiated, and it is activated by default. + /// + /// # Panics + /// If the capacity is zero. It must have space for at least one item. + /// + /// # Examples + /// + /// ``` + /// use futures::executor::block_on; + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let shared1 = stream.shared(4); + /// let shared2 = shared1.clone(); + /// + /// assert_eq!(vec![1,2,3], block_on(shared1.collect::>())); + /// assert_eq!(vec![1,2,3], block_on(shared2.collect::>())); + /// ``` + /// + /// ``` + /// use futures::executor::block_on; + /// use futures::stream::{self, StreamExt}; + /// use std::thread; + /// + /// let stream = stream::iter(1..=3); + /// let shared1 = stream.shared(4); + /// let shared2 = shared1.clone(); + /// let join_handle = thread::spawn(move || { + /// assert_eq!(vec![1,2,3], block_on(shared2.collect::>())); + /// }); + /// assert_eq!(vec![1,2,3], block_on(shared1.collect::>())); + /// join_handle.join().unwrap(); + /// ``` + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(vec![1,2,3]); + /// let mut shared1 = stream.shared(4); + /// + /// assert_eq!(Some(1), shared1.next().await); + /// + /// let mut shared2 = shared1.clone(); + /// assert_eq!(Some(2), shared2.next().await); + /// assert_eq!(Some(3), shared2.next().await); + /// assert_eq!(vec![2,3], shared1.collect::>().await); + /// assert_eq!(None, shared2.next().await); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn shared(self, capacity: usize) -> Shared + where + Self: Sized, + Self::Item: Clone, + { + Shared::new(self, capacity) + } + /// An adaptor for creating a buffered list of pending futures. /// /// If this stream's item can be converted into a future, then this adaptor diff --git a/futures-util/src/stream/stream/shared.rs b/futures-util/src/stream/stream/shared.rs new file mode 100644 index 0000000000..9a8744a6c5 --- /dev/null +++ b/futures-util/src/stream/stream/shared.rs @@ -0,0 +1,432 @@ +use crate::task::{waker_ref, ArcWake}; +use crate::wakerset::{WakerKey, WakerSet}; +use alloc::boxed::Box; +use alloc::fmt; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::cell::UnsafeCell; +use core::marker::Unpin; +use core::pin::Pin; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; + +/// A stream that is cloneable and can be polled in multiple threads. +/// Use the [`shared`](crate::StreamExt::shared) combinator method to convert +/// any stream into a `Shared` sream. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Shared { + /// Current index into the buffer + idx: usize, + waker_key: WakerKey, + inner: Arc>, +} + +struct Slot { + /// This contains both the number of receivers on the current slot, + /// as well as if the slot is full. + /// The least significant bit is the full state, the rest is the refcount + state: AtomicUsize, + val: UnsafeCell>, +} + +const REFCOUNT_MASK: usize = usize::MAX >> 1; +const FILLED: usize = usize::MAX ^ REFCOUNT_MASK; + +impl Default for Slot { + fn default() -> Self { + Slot { state: AtomicUsize::new(0), val: UnsafeCell::new(None) } + } +} + +struct Inner { + buffer: Box<[Slot]>, + stream: UnsafeCell, // Should this be an Option, so we can drop it if we hit the end? + poll_state: AtomicUsize, + notifier: Arc, +} + +unsafe impl Send for Inner +where + S: Stream + Send, + S::Item: Send + Sync, +{ +} + +unsafe impl Sync for Inner +where + S: Stream + Send, + S::Item: Send + Sync, +{ +} + +const IDLE: usize = 0; +const POLLING: usize = 1; +const POISONED: usize = 2; + +// The stream itself is polled behind the `Arc`, so it won't be moved +// when `Shared` is moved. +impl Unpin for Shared {} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Inner").field("capacity", &(self.buffer.len() - 1)).finish() + } +} + +unsafe impl Send for Slot where T: Send {} +unsafe impl Sync for Slot where T: Send + Sync {} + +impl Shared { + pub(super) fn new(stream: S, cap: usize) -> Shared { + assert!(cap > 0, "Shared stream must have capacity of at least 1"); + // Need an extra empty element for ring buffer + let cap = cap + 1; + assert!(cap < TERMINATED, "Capacity too large"); + let mut buffer = Vec::with_capacity(cap); + // The first slot will initially have one reference to it. + buffer.push(Slot { + // Initial state is empty, with 1 reference + state: AtomicUsize::new(1), + val: UnsafeCell::new(None), + }); + + // The rest will initially be empty + for _ in 0..cap { + buffer.push(Slot::default()); + } + + Shared { + idx: 0, + waker_key: WakerKey::NULL, + inner: Arc::new(Inner { + buffer: buffer.into(), + stream: UnsafeCell::new(stream), + poll_state: AtomicUsize::new(IDLE), + notifier: Arc::new(WakerSet::new()), + }), + } + } +} + +/// If the idx is set to this, then the stream has reached the end. +const TERMINATED: usize = usize::MAX; + +impl Shared +where + S::Item: Clone, +{ + fn take_next(&mut self) -> Option { + // FIXME: should we stop moving if we reach the end of the + // stream? + let (result, idx) = self.inner.take(self.idx); + self.idx = idx; + result + } + + #[inline] + fn slot(&self) -> &Slot { + self.inner.slot(self.idx) + } + + fn poll_stream(&mut self) -> Poll> { + if self.inner.fill_buffer(self.idx) { + debug_assert!(self.slot().state.load(Relaxed) & FILLED != 0); + Poll::Ready(self.take_next()) + } else { + Poll::Pending + } + } +} + +impl Stream for Shared +where + S::Item: Clone, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + if this.is_terminated() { + return Poll::Ready(None); + } + if this.slot().is_full() { + Poll::Ready(this.take_next()) + } else { + this.inner.record_waker(&mut this.waker_key, cx); + this.poll_stream() + } + } +} + +impl FusedStream for Shared +where + S::Item: Clone, +{ + fn is_terminated(&self) -> bool { + self.idx == TERMINATED + } +} + +impl Clone for Shared +where + S::Item: Clone, +{ + fn clone(&self) -> Self { + let inner = self.inner.clone(); + assert!( + Arc::strong_count(&inner) < REFCOUNT_MASK, + "Created too many clones of shared stream" + ); + self.slot().inc_refcount(); + Shared { idx: self.idx, waker_key: WakerKey::NULL, inner } + } +} + +impl Drop for Shared { + fn drop(&mut self) { + self.inner.notifier.unregister(self.waker_key); + self.inner.drop_ref(self.idx); + } +} + +impl Inner { + #[inline] + fn cap(&self) -> usize { + self.buffer.len() + } + + fn prev_idx(&self, idx: usize) -> usize { + match idx.checked_sub(1) { + Some(prev) => prev % self.cap(), + None => self.cap() - 1, + } + } + fn next_idx(&self, idx: usize) -> usize { + (idx + 1) % self.cap() + } + + fn slot(&self, idx: usize) -> &Slot { + &self.buffer[idx] + } + + #[inline] + fn is_first(&self, idx: usize) -> bool { + let state = self.buffer[self.prev_idx(idx)].state.load(Acquire); + state & FILLED == 0 + } + + /// Notify any waiting tasks that there is space available. + fn notify_waiting(&self, idx: usize) { + let slot = &self.buffer[self.prev_idx(idx)]; + let state = slot.state.load(Relaxed); + if state & FILLED == 0 && state & REFCOUNT_MASK != 0 { + ArcWake::wake_by_ref(&self.notifier); + } + } + + fn drop_ref(&self, idx: usize) { + // drop_ref might be called when the idx is + // TERMINATED, because it reached the end. + if idx > self.buffer.len() { + return; + } + let slot = &self.buffer[idx]; + let old = slot.state.fetch_sub(1, Release); + let refcount = old & REFCOUNT_MASK; + debug_assert!(refcount > 0); + + // If we are the last reference to the first slot, then drop the slot. + if refcount == 1 && self.is_first(idx) { + // This is safe, because nothing else can access this slot if we are the last + // thing to use it. + unsafe { *slot.val.get() = None }; + // state is empty with no references + slot.state.store(0, Release); + self.notify_waiting(idx); + } + } + + fn record_waker(&self, waker_key: &mut WakerKey, cx: &mut Context<'_>) { + self.notifier.record_waker(waker_key, cx); + } + + /// Attempt to get a lock for polling + /// + /// Returns Ok with a lock guard if able to optain a lock + /// and Err if the lock is already held. + /// + /// Panics if the state is poisoned + fn try_lock(&self) -> Result, ()> { + match self.poll_state.compare_exchange(IDLE, POLLING, Acquire, Relaxed) { + Ok(_) => Ok(LockGuard { state: &self.poll_state, did_not_panic: true }), + Err(POLLING) => Err(()), + Err(POISONED) => panic!("inner stream panicked during poll"), + _ => unreachable!(), + } + } + + /// Try polling the upstream stream to fill the buffer. + /// + /// This will keep polling until we either run out of space + /// or the upstream stream is no longer ready. This reduces + /// context switches. + /// + /// Returns true if at least one slot was filled (including by another task), false otherwise + fn fill_buffer(&self, mut idx: usize) -> bool { + let mut slot = self.slot(idx); + let mut lock = match self.try_lock() { + Ok(lock) => lock, + // Even if we didn't get the lock, the slot might have been filled already + Err(_) => return slot.is_full(), + }; + // Check that the slot wasn't filled between the last check and possibly getting the lock + if slot.is_full() { + return true; + } + + let waker = waker_ref(&self.notifier); + let mut cx = Context::from_waker(&waker); + + let mut filled_one = false; + + loop { + let next_idx = self.next_idx(idx); + let next_slot = &self.buffer[next_idx]; + // Use relaxed ordering because, we don't actually read the value of the next slot + if next_slot.state.load(Relaxed) & FILLED != 0 { + // We have filled the buffer, return whether we + // filled any other slots + break; + } + let item = { + // Safety: + // Pin::new_unchecked is safe because the stream is inside an Arc, and is + // never moved out of it. + // dereferencing self.inner.stream.get() is safe because we currently hold a lock + // from try_start_poll() + let stream = unsafe { Pin::new_unchecked(&mut *self.stream.get()) }; + let res = lock.with_panic_check(|| stream.poll_next(&mut cx)); + match res { + Poll::Pending => { + return filled_one; + } + Poll::Ready(item) => item, + } + }; + let is_end = item.is_none(); + // Safety: + // We have a lock that prevents any concurrent writes to any slots. And the full + // state is currently false, so nothing else can be reading from this slot. + unsafe { slot.fill(item) }; + filled_one = true; + + if is_end { + drop(lock); + self.notifier.wake_and_finish(); + return filled_one; + } + + idx = next_idx; + slot = next_slot; + } + if filled_one { + self.notifier.wake_all(); + drop(lock); + } + filled_one + } +} + +impl Inner +where + S::Item: Clone, +{ + fn take(&self, idx: usize) -> (Option, usize) { + let slot = &self.buffer[idx]; + // if we've gotten this far, the slot is already filled. + let state = slot.state.load(Acquire); + debug_assert!(state & FILLED != 0, "Attempt to read from unfilled slot"); + // This is valid because the previous buffer shouldn't be written to + // until this one is empty. + let value = if state == FILLED & 1 && self.is_first(idx) { + // This is safe because there is only a single + // stream that still has access to this slot, and it + // is the one currently taking a value out. + let result = unsafe { (*slot.val.get()).take() }; + slot.state.store(0, Release); + self.notify_waiting(idx); + result + } else { + // This is safe because nothing else should write to this until + // it is the first slot, and it is empty. + let result = unsafe { (*slot.val.get()).clone() }; + self.drop_ref(idx); + result + }; + if value.is_some() { + let next_idx = self.next_idx(idx); + self.buffer[next_idx].inc_refcount(); + (value, next_idx) + } else { + // we've reached the end of the stream, so fuse the outer stream + (value, TERMINATED) + } + } +} + +impl Slot { + fn is_full(&self) -> bool { + self.state.load(Acquire) & FILLED != 0 + } + + fn inc_refcount(&self) { + let old = self.state.fetch_add(1, Relaxed); + assert!(old & REFCOUNT_MASK != REFCOUNT_MASK); + } + + /// Store an item in the slot + /// + /// Safety: + /// This function is only safe to call if it is safe to mutate the slot. + /// That is if the caller has exclusive access to this slot. + unsafe fn fill(&self, item: Option) { + // Safety: + // Having exclusive access on the slot is a condition of + // calling this method safely. + unsafe { + *self.val.get() = item; + } + let old = self.state.fetch_or(FILLED, Release); + debug_assert!(old & FILLED == 0); + } +} + +struct LockGuard<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, +} + +impl LockGuard<'_> { + fn with_panic_check(&mut self, f: F) -> T + where + F: FnOnce() -> T, + { + self.did_not_panic = false; + let res = f(); + self.did_not_panic = true; + res + } +} + +impl Drop for LockGuard<'_> { + fn drop(&mut self) { + if self.did_not_panic { + self.state.store(IDLE, Release) + } else { + self.state.store(POISONED, Release) + } + } +} diff --git a/futures/tests/stream_shared.rs b/futures/tests/stream_shared.rs new file mode 100644 index 0000000000..59297b331c --- /dev/null +++ b/futures/tests/stream_shared.rs @@ -0,0 +1,88 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; +use std::cell::Cell; +use std::rc::Rc; +use std::thread; + +struct CountClone(Rc>); + +impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + Self(self.0.clone()) + } +} + +fn send_shared_and_wait_on_multiple_threads(threads_number: u32) { + let items = [6, 9, 200, 3, 4, 192, 54]; + let (mut tx, rx) = mpsc::channel::(3); + let join_handles = { + let s = rx.shared(4); + (0..threads_number) + .map(|_| { + let mut cloned_stream = s.clone(); + thread::spawn(move || { + for i in items { + assert_eq!(block_on(cloned_stream.next()).unwrap(), i); + } + }) + }) + .collect::>() + }; + + for i in items { + block_on(tx.send(i)).unwrap(); + } + tx.close_channel(); + + for join_handle in join_handles { + join_handle.join().unwrap(); + } +} + +#[test] +fn one_thread() { + send_shared_and_wait_on_multiple_threads(1); +} + +#[test] +fn two_threads() { + send_shared_and_wait_on_multiple_threads(2); +} + +#[test] +fn many_threads() { + send_shared_and_wait_on_multiple_threads(1000); +} + +#[test] +fn drop_on_one_task_ok() { + let (mut tx, rx) = mpsc::channel::(2); + let s1 = rx.shared(2); + let s2 = s1.clone(); + + let (mut tx2, rx2) = mpsc::channel::(2); + + let t1 = thread::spawn(|| { + let f = stream::select(s1, rx2).take(2).collect::>(); + drop(block_on(f)); + }); + + let (tx3, rx3) = mpsc::channel::(2); + let t2 = thread::spawn(move || { + let _ = block_on(s2.forward(tx3)); + }); + + block_on(tx.send(42)).unwrap(); + block_on(tx2.send(11)).unwrap(); // cancel s1 + drop(tx2); + t1.join().unwrap(); + + block_on(tx.send(43)).unwrap(); + drop(tx); + let result: Vec<_> = block_on_stream(rx3).collect(); + assert_eq!(result, [42, 43]); + t2.join().unwrap(); +} From a7ae1d89b8c82ae73b0f4bc8b3cf987442dcf8e7 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Wed, 8 Mar 2023 00:42:18 -0700 Subject: [PATCH 03/10] Add more tests for shared streams And fix bug with wrong operator --- futures-util/src/stream/stream/shared.rs | 2 +- futures/tests/stream_shared.rs | 98 +++++++++++++++++++++++- 2 files changed, 97 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/stream/shared.rs b/futures-util/src/stream/stream/shared.rs index 9a8744a6c5..f30d71cf96 100644 --- a/futures-util/src/stream/stream/shared.rs +++ b/futures-util/src/stream/stream/shared.rs @@ -351,7 +351,7 @@ where debug_assert!(state & FILLED != 0, "Attempt to read from unfilled slot"); // This is valid because the previous buffer shouldn't be written to // until this one is empty. - let value = if state == FILLED & 1 && self.is_first(idx) { + let value = if state == FILLED | 1 && self.is_first(idx) { // This is safe because there is only a single // stream that still has access to this slot, and it // is the one currently taking a value out. diff --git a/futures/tests/stream_shared.rs b/futures/tests/stream_shared.rs index 59297b331c..d0973bbf37 100644 --- a/futures/tests/stream_shared.rs +++ b/futures/tests/stream_shared.rs @@ -1,10 +1,13 @@ use futures::channel::mpsc; use futures::executor::{block_on, block_on_stream}; +use futures::future; use futures::sink::SinkExt; -use futures::stream::{self, StreamExt}; -use std::cell::Cell; +use futures::stream::{self, LocalBoxStream, StreamExt}; +use std::cell::{Cell, RefCell}; use std::rc::Rc; +use std::panic::AssertUnwindSafe; use std::thread; +use std::task::Poll; struct CountClone(Rc>); @@ -86,3 +89,94 @@ fn drop_on_one_task_ok() { assert_eq!(result, [42, 43]); t2.join().unwrap(); } + +#[test] +fn drop_in_poll() { + let slot1 = Rc::new(RefCell::new(None)); + let slot2 = slot1.clone(); + + let mut stream1 = stream::once(future::lazy(move |_| { + slot2.replace(None); + 1 + })) + .shared(1); + + let stream2: LocalBoxStream<_> = Box::pin(stream1.clone()); + slot1.replace(Some(stream2)); + + assert_eq!(block_on(stream1.next()), Some(1)); +} + +#[test] +fn dont_clone_in_single_owner_shared_stream() { + let counter = CountClone(Rc::new(Cell::new(0))); + let (mut tx, rx) = mpsc::channel(2); + + let mut rx = rx.shared(1); + + block_on(tx.send(counter)).unwrap(); + + assert_eq!(block_on(rx.next()).unwrap().0.get(), 0); +} + +#[test] +fn dont_do_unnecessary_clones_on_output() { + let counter = CountClone(Rc::new(Cell::new(0))); + let (mut tx, rx) = mpsc::channel(2); + + let mut rx = rx.shared(1); + + block_on(tx.send(counter)).unwrap(); + + assert_eq!(block_on(rx.clone().next()).unwrap().0.get(), 1); + assert_eq!(block_on(rx.clone().next()).unwrap().0.get(), 2); + assert_eq!(block_on(rx.next()).unwrap().0.get(), 2); +} + +#[test] +fn shared_stream_that_wakes_itself_until_pending_is_returned() { + let proceed = Cell::new(false); + let mut stream = stream::poll_fn(|cx| { + if proceed.get() { + Poll::Ready(Some(())) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .shared(3); + + assert_eq!(block_on(future::join(stream.next(), async { proceed.set(true) })), (Some(()), ())); +} + +#[test] +#[should_panic(expected = "inner stream panicked during poll")] +fn panic_while_poll() { + let mut stream = stream::poll_fn::(|_| panic!("test")).shared(1); + + let mut stream_captured = stream.clone(); + std::panic::catch_unwind(AssertUnwindSafe(|| { + block_on(stream_captured.next()); + })).unwrap_err(); + + block_on(stream.next()); +} + +#[test] +#[should_panic(expected = "test_marker")] +fn poll_while_panic() { + struct S; + + impl Drop for S { + fn drop(&mut self) { + let mut stream = stream::repeat(1).shared(2); + assert_eq!(block_on(stream.clone().next()), Some(1)); + assert_eq!(block_on(stream.next()), Some(1)); + } + } + + let _s = S {}; + + panic!("test_marker"); +} + From 677a6dfbeb2eed0ae62d58bcaba27247650d4512 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Wed, 8 Mar 2023 01:09:20 -0700 Subject: [PATCH 04/10] Fix race condition in shared stream --- futures-util/src/stream/stream/shared.rs | 22 ++++++++++++---------- futures/tests/stream_shared.rs | 8 ++++---- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/futures-util/src/stream/stream/shared.rs b/futures-util/src/stream/stream/shared.rs index f30d71cf96..fde71eedfb 100644 --- a/futures-util/src/stream/stream/shared.rs +++ b/futures-util/src/stream/stream/shared.rs @@ -79,6 +79,9 @@ impl fmt::Debug for Inner { unsafe impl Send for Slot where T: Send {} unsafe impl Sync for Slot where T: Send + Sync {} +/// If the idx is set to this, then the stream has reached the end. +const TERMINATED: usize = usize::MAX; + impl Shared { pub(super) fn new(stream: S, cap: usize) -> Shared { assert!(cap > 0, "Shared stream must have capacity of at least 1"); @@ -111,16 +114,11 @@ impl Shared { } } -/// If the idx is set to this, then the stream has reached the end. -const TERMINATED: usize = usize::MAX; - impl Shared where S::Item: Clone, { fn take_next(&mut self) -> Option { - // FIXME: should we stop moving if we reach the end of the - // stream? let (result, idx) = self.inner.take(self.idx); self.idx = idx; result @@ -200,7 +198,7 @@ impl Inner { fn prev_idx(&self, idx: usize) -> usize { match idx.checked_sub(1) { - Some(prev) => prev % self.cap(), + Some(prev) => prev, None => self.cap() - 1, } } @@ -234,7 +232,7 @@ impl Inner { return; } let slot = &self.buffer[idx]; - let old = slot.state.fetch_sub(1, Release); + let old = slot.state.fetch_sub(1, Relaxed); let refcount = old & REFCOUNT_MASK; debug_assert!(refcount > 0); @@ -349,9 +347,14 @@ where // if we've gotten this far, the slot is already filled. let state = slot.state.load(Acquire); debug_assert!(state & FILLED != 0, "Attempt to read from unfilled slot"); + + // We need to increment the next slot first, to ensure that another thread + // doesn't think it owns the next slot after we clear this slot. + let next_idx = self.next_idx(idx); + self.buffer[next_idx].inc_refcount(); // This is valid because the previous buffer shouldn't be written to // until this one is empty. - let value = if state == FILLED | 1 && self.is_first(idx) { + let value = if state == (FILLED | 1) && self.is_first(idx) { // This is safe because there is only a single // stream that still has access to this slot, and it // is the one currently taking a value out. @@ -367,11 +370,10 @@ where result }; if value.is_some() { - let next_idx = self.next_idx(idx); - self.buffer[next_idx].inc_refcount(); (value, next_idx) } else { // we've reached the end of the stream, so fuse the outer stream + self.drop_ref(next_idx); (value, TERMINATED) } } diff --git a/futures/tests/stream_shared.rs b/futures/tests/stream_shared.rs index d0973bbf37..9a98449538 100644 --- a/futures/tests/stream_shared.rs +++ b/futures/tests/stream_shared.rs @@ -4,10 +4,10 @@ use futures::future; use futures::sink::SinkExt; use futures::stream::{self, LocalBoxStream, StreamExt}; use std::cell::{Cell, RefCell}; -use std::rc::Rc; use std::panic::AssertUnwindSafe; -use std::thread; +use std::rc::Rc; use std::task::Poll; +use std::thread; struct CountClone(Rc>); @@ -157,7 +157,8 @@ fn panic_while_poll() { let mut stream_captured = stream.clone(); std::panic::catch_unwind(AssertUnwindSafe(|| { block_on(stream_captured.next()); - })).unwrap_err(); + })) + .unwrap_err(); block_on(stream.next()); } @@ -179,4 +180,3 @@ fn poll_while_panic() { panic!("test_marker"); } - From d87e4f201e3497e979b82d7c3e36d0f9df9de7f1 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sat, 11 Mar 2023 23:34:08 -0700 Subject: [PATCH 05/10] Fix data races in shared stream I don't completely understand why the stronger ordering is needed, but it seems to fix the data races detected by the thread sanitizer. --- futures-util/src/stream/stream/shared.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/stream/shared.rs b/futures-util/src/stream/stream/shared.rs index fde71eedfb..cca91fc5f6 100644 --- a/futures-util/src/stream/stream/shared.rs +++ b/futures-util/src/stream/stream/shared.rs @@ -8,7 +8,7 @@ use core::cell::UnsafeCell; use core::marker::Unpin; use core::pin::Pin; use core::sync::atomic::AtomicUsize; -use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -232,7 +232,7 @@ impl Inner { return; } let slot = &self.buffer[idx]; - let old = slot.state.fetch_sub(1, Relaxed); + let old = slot.state.fetch_sub(1, AcqRel); let refcount = old & REFCOUNT_MASK; debug_assert!(refcount > 0); @@ -385,7 +385,7 @@ impl Slot { } fn inc_refcount(&self) { - let old = self.state.fetch_add(1, Relaxed); + let old = self.state.fetch_add(1, Release); assert!(old & REFCOUNT_MASK != REFCOUNT_MASK); } From 7b70d5821104fea63f2eb5e1ba90e5e5add72410 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 13 Mar 2023 01:28:11 -0600 Subject: [PATCH 06/10] Add more tests for shared streams --- futures-util/src/stream/stream/mod.rs | 2 ++ futures/tests/stream_shared.rs | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index baf79c9145..9ad8fd000b 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -1487,6 +1487,8 @@ pub trait StreamExt: Stream { /// # Panics /// If the capacity is zero. It must have space for at least one item. /// + /// If the capacity is too large. The maximum size may be les than `usize::MAX`. + /// /// # Examples /// /// ``` diff --git a/futures/tests/stream_shared.rs b/futures/tests/stream_shared.rs index 9a98449538..e867fa5cdd 100644 --- a/futures/tests/stream_shared.rs +++ b/futures/tests/stream_shared.rs @@ -180,3 +180,28 @@ fn poll_while_panic() { panic!("test_marker"); } + +#[test] +#[should_panic(expected = "Shared stream must have capacity of at least 1")] +fn panic_for_zero_capacity() { + let _ = stream::empty::().shared(0); +} + +#[test] +fn empty_stream() { + let mut s1 = stream::empty::().shared(1); + let mut s2 = s1.clone(); + + assert!(block_on(s1.next()).is_none()); + assert!(block_on(s2.next()).is_none()); + assert!(block_on(s1.clone().next()).is_none()); +} + +#[test] +fn fused() { + use stream::FusedStream; + let mut s = stream::empty::().shared(1); + + assert!(block_on(s.next()).is_none()); + assert!(s.is_terminated()); +} From 890e5863151e98d6c162736c05d5ecc55526c65b Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 13 Mar 2023 01:31:33 -0600 Subject: [PATCH 07/10] Don't increment slot for terminated shared stream --- futures-util/src/stream/stream/shared.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/shared.rs b/futures-util/src/stream/stream/shared.rs index cca91fc5f6..05358828e6 100644 --- a/futures-util/src/stream/stream/shared.rs +++ b/futures-util/src/stream/stream/shared.rs @@ -178,7 +178,9 @@ where Arc::strong_count(&inner) < REFCOUNT_MASK, "Created too many clones of shared stream" ); - self.slot().inc_refcount(); + if !self.is_terminated() { + self.slot().inc_refcount(); + } Shared { idx: self.idx, waker_key: WakerKey::NULL, inner } } } From fe7be269f56233ef3cf7988b2d0ccecdaca8843f Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Thu, 16 Mar 2023 01:35:19 -0600 Subject: [PATCH 08/10] Fix ordering for loading state of next slot while filling in shared stream --- futures-util/src/stream/stream/shared.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/stream/shared.rs b/futures-util/src/stream/stream/shared.rs index 05358828e6..a657f5aabf 100644 --- a/futures-util/src/stream/stream/shared.rs +++ b/futures-util/src/stream/stream/shared.rs @@ -295,8 +295,9 @@ impl Inner { loop { let next_idx = self.next_idx(idx); let next_slot = &self.buffer[next_idx]; - // Use relaxed ordering because, we don't actually read the value of the next slot - if next_slot.state.load(Relaxed) & FILLED != 0 { + // Use acquire ordering, so that we don't try writing to the next slot until + // we can see that it has been emptied. + if next_slot.state.load(Acquire) & FILLED != 0 { // We have filled the buffer, return whether we // filled any other slots break; From dc16318c5c8a4a5718515b58e5efba973f9475b2 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sun, 26 Apr 2026 23:31:22 -0600 Subject: [PATCH 09/10] Fix builds for shared streams --- futures-util/src/lib.rs | 2 +- futures-util/src/stream/mod.rs | 5 ++++- futures-util/src/stream/stream/mod.rs | 5 ++--- futures/tests/stream_shared.rs | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 8652dde577..957780df4e 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -328,5 +328,5 @@ mod abortable; mod fns; mod unfold_state; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] mod wakerset; diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 8ed8e0129d..231b5b7a26 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -25,7 +25,7 @@ pub use self::stream::{ }; #[cfg(feature = "std")] -pub use self::stream::{CatchUnwind, Shared}; +pub use self::stream::CatchUnwind; #[cfg(feature = "alloc")] pub use self::stream::Chunks; @@ -33,6 +33,9 @@ pub use self::stream::Chunks; #[cfg(feature = "alloc")] pub use self::stream::ReadyChunks; +#[cfg(feature = "alloc")] +pub use self::stream::Shared; + #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::stream::Forward; diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 9ad8fd000b..b14f13a98f 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -218,10 +218,9 @@ mod catch_unwind; #[cfg(feature = "std")] pub use self::catch_unwind::CatchUnwind; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] mod shared; -#[cfg(feature = "std")] -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +#[cfg(feature = "alloc")] pub use self::shared::Shared; impl StreamExt for T where T: Stream {} diff --git a/futures/tests/stream_shared.rs b/futures/tests/stream_shared.rs index e867fa5cdd..76b3e4b5bd 100644 --- a/futures/tests/stream_shared.rs +++ b/futures/tests/stream_shared.rs @@ -101,7 +101,7 @@ fn drop_in_poll() { })) .shared(1); - let stream2: LocalBoxStream<_> = Box::pin(stream1.clone()); + let stream2: LocalBoxStream<'_, _> = Box::pin(stream1.clone()); slot1.replace(Some(stream2)); assert_eq!(block_on(stream1.next()), Some(1)); From 8250cba410fe7555580228845090b71d6adbe7d3 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Sun, 3 May 2026 00:56:28 -0600 Subject: [PATCH 10/10] Fix conditions for wakerset --- futures-util/src/lib.rs | 2 +- futures-util/src/stream/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 957780df4e..a8cf324b40 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -328,5 +328,5 @@ mod abortable; mod fns; mod unfold_state; -#[cfg(feature = "alloc")] +#[cfg(any(feature = "std", all(feature = "alloc", feature = "spin")))] mod wakerset; diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 231b5b7a26..01c2a7dc76 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -33,7 +33,7 @@ pub use self::stream::Chunks; #[cfg(feature = "alloc")] pub use self::stream::ReadyChunks; -#[cfg(feature = "alloc")] +#[cfg(any(feature = "std", all(feature = "alloc", feature = "spin")))] pub use self::stream::Shared; #[cfg(feature = "sink")]