Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ Common utilities and extension traits for the futures-rs library.
default = ["std", "async-await", "async-await-macro"]
std = ["alloc", "futures-core/std", "futures-task/std", "slab/std"]
alloc = ["futures-core/alloc", "futures-task/alloc", "slab"]
portable-atomic-alloc = ["portable-atomic-util/alloc", "portable-atomic"]
async-await = []
async-await-macro = ["async-await", "futures-macro"]
compat = ["std", "futures_01", "libc"]
io-compat = ["io", "compat", "tokio-io", "libc"]
sink = ["futures-sink"]
io = ["std", "futures-io", "memchr"]
channel = ["std", "futures-channel"]
portable-atomic = ["futures-core/portable-atomic"]
portable-atomic = ["futures-core/portable-atomic", "portable-atomic-util", "portable_atomic_crate"]

# Unstable features
# These features are outside of the normal semver guarantees and require the
Expand All @@ -43,11 +44,18 @@ memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-project-lite = "0.2.6"
spin = { version = "0.12.0", optional = true }
spin = { version = "0.10.0", optional = true }
portable-atomic-util = { version = "0.2.7", default-features = false, optional = true }

# INDIRECT DEPENDENCYS BUT ONLY FOR SPECIFIC MINIMAL VERSIONS
libc = { version = "0.2.26", optional = true }

[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.13.1"
default-features = false
optional = true

[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
futures-test = { path = "../futures-test" }
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/futures_unordered/iter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::task::Task;
use super::Arc;
use super::FuturesUnordered;
use alloc::sync::Arc;
use core::marker::PhantomData;
use core::pin::Pin;
use core::ptr;
Expand Down
17 changes: 14 additions & 3 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@
//! This module is only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.

use crate::task::AtomicWaker;
#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;

#[cfg(not(feature = "portable-atomic-alloc"))]
use alloc::sync::{Arc, Weak};

#[cfg(feature = "portable-atomic")]
use portable_atomic_crate as atomic;

#[cfg(feature = "portable-atomic-alloc")]
use portable_atomic_util::{Arc, Weak};

use crate::task::AtomicWaker;
use atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use atomic::{AtomicBool, AtomicPtr};
use core::cell::UnsafeCell;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::atomic::AtomicPtr;
use super::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use super::Arc;
use crate::task::AtomicWaker;
use alloc::sync::Arc;
use core::cell::UnsafeCell;
use core::ptr;
use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

use super::abort::abort;
use super::task::Task;
Expand Down
65 changes: 60 additions & 5 deletions futures-util/src/stream/futures_unordered/task.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,63 @@
use alloc::sync::{Arc, Weak};
use super::{atomic, Arc, Weak};

use core::cell::UnsafeCell;
use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};

use atomic::Ordering::{self, Relaxed, SeqCst};
use atomic::{AtomicBool, AtomicPtr};

use super::abort::abort;
use super::ReadyToRunQueue;
use crate::task::ArcWake;

/// Local version of `crate::arc_wake::ArcWake` to allow portable-atomic-util's
/// `Arc` to be used.
///
/// A way of waking up a specific task.
///
/// By implementing this trait, types that are expected to be wrapped in an `Arc`
/// can be converted into [`Waker`] objects.
/// Those Wakers can be used to signal executors that a task it owns
/// is ready to be `poll`ed again.
///
/// Currently, there are two ways to convert `ArcWake` into [`Waker`]:
///
/// * [`waker`](super::waker()) converts `Arc<impl ArcWake>` into [`Waker`].
/// * [`waker_ref`](super::waker_ref()) converts `&Arc<impl ArcWake>` into [`WakerRef`] that
/// provides access to a [`&Waker`][`Waker`].
///
/// [`Waker`]: std::task::Waker
/// [`WakerRef`]: super::WakerRef
// Note: Send + Sync required because `Arc<T>` doesn't automatically imply
// those bounds, but `Waker` implements them.
pub(crate) trait ArcWake: Send + Sync {
/// Indicates that the associated task is ready to make progress and should
/// be `poll`ed.
///
/// This function can be called from an arbitrary thread, including threads which
/// did not create the `ArcWake` based [`Waker`].
///
/// Executors generally maintain a queue of "ready" tasks; `wake` should place
/// the associated task onto this queue.
///
/// [`Waker`]: std::task::Waker
fn wake(this: Arc<Self>) {
Self::wake_by_ref(&this)
}

/// Indicates that the associated task is ready to make progress and should
/// be `poll`ed.
///
/// This function can be called from an arbitrary thread, including threads which
/// did not create the `ArcWake` based [`Waker`].
///
/// Executors generally maintain a queue of "ready" tasks; `wake_by_ref` should place
/// the associated task onto this queue.
///
/// This function is similar to [`wake`](ArcWake::wake), but must not consume the provided data
/// pointer.
///
/// [`Waker`]: std::task::Waker
fn wake_by_ref(arc_self: &Arc<Self>);
}

pub(super) struct Task<Fut> {
// The future
Expand Down Expand Up @@ -125,13 +177,16 @@ impl<Fut> Drop for Task<Fut> {
}

mod waker_ref {
use super::ArcWake;
#[cfg(not(feature = "portable-atomic-alloc"))]
use alloc::sync::Arc;
use core::marker::PhantomData;
use core::mem;
use core::mem::ManuallyDrop;
use core::ops::Deref;
use core::task::{RawWaker, RawWakerVTable, Waker};
use futures_task::ArcWake;
#[cfg(feature = "portable-atomic-alloc")]
use portable_atomic_util::Arc;

pub(crate) struct WakerRef<'a> {
waker: ManuallyDrop<Waker>,
Expand Down
7 changes: 5 additions & 2 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ mod futures_ordered;
#[cfg(feature = "alloc")]
pub use self::futures_ordered::FuturesOrdered;

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg_attr(target_os = "none", cfg(any(target_has_atomic = "ptr", feature = "portable-atomic")))]
#[cfg(feature = "alloc")]
pub mod futures_unordered;
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg_attr(
target_os = "none",
cfg(any(target_has_atomic = "ptr", feature = "portable-atomic"))
)]
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use self::futures_unordered::FuturesUnordered;
Expand Down
Loading