Skip to content
This repository was archived by the owner on Nov 5, 2018. It is now read-only.
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 150 additions & 73 deletions src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@
/// ```
///
/// Much more straightforward.

use std::any::Any;
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::DerefMut;
use std::panic::{self, AssertUnwindSafe};
use std::panic;
use std::rc::Rc;
use std::thread;
use std::io;
use std::sync::{Arc, Mutex};
use std::thread;

#[doc(hidden)]
trait FnBox<T> {
Expand Down Expand Up @@ -152,50 +152,36 @@ where
{
let closure: Box<FnBox<T> + 'a> = Box::new(f);
let closure: Box<FnBox<T> + Send> = mem::transmute(closure);
builder.spawn(move || {
closure.call_box()
})
builder.spawn(move || closure.call_box())
}

pub struct Scope<'env> {
/// The list of the deferred functions and thread join jobs.
dtors: RefCell<Option<DtorChain<'env, thread::Result<()>>>>,
/// The list of the thread join jobs.
joins: RefCell<Vec<Box<FnBox<thread::Result<()>> + 'env>>>,
/// Thread panics invoked so far.
panics: RefCell<Vec<Box<Any + Send + 'static>>>,
// !Send + !Sync
_marker: PhantomData<*const ()>,
}

struct DtorChain<'env, T> {
dtor: Box<FnBox<T> + 'env>,
next: Option<Box<DtorChain<'env, T>>>,
}

impl<'env, T> DtorChain<'env, T> {
pub fn pop(chain: &mut Option<DtorChain<'env, T>>) -> Option<Box<FnBox<T> + 'env>> {
chain.take().map(|mut node| {
*chain = node.next.take().map(|b| *b);
node.dtor
})
}
}

struct JoinState<T> {
join_handle: thread::JoinHandle<()>,
result: ScopedThreadResult<T>
result: ScopedThreadResult<T>,
}

impl<T> JoinState<T> {
fn new(join_handle: thread::JoinHandle<()>, result: ScopedThreadResult<T>) -> JoinState<T> {
JoinState {
join_handle,
result
result,
}
}

fn join(self) -> thread::Result<T> {
let result = self.result;
self.join_handle.join().map(|_| {
result.lock().unwrap().take().unwrap()
})
self.join_handle
.join()
.map(|_| result.lock().unwrap().take().unwrap())
}
}

Expand All @@ -210,39 +196,49 @@ pub struct ScopedJoinHandle<'scope, T: 'scope> {
unsafe impl<'scope, T> Send for ScopedJoinHandle<'scope, T> {}
unsafe impl<'scope, T> Sync for ScopedJoinHandle<'scope, T> {}

/// Create a new `Scope` for [*scoped thread spawning*](struct.Scope.html#method.spawn).
/// Creates a new `Scope` for [*scoped thread spawning*](struct.Scope.html#method.spawn).
///
/// In addition, you can [register ad-hoc functions](struct.Scope.html#method.defer) that are
/// deferred to be run. No matter what happens, before the `Scope` is dropped, it is guaranteed that
/// all the unjoined spawned scoped threads are joined and the deferred functions are run.
/// No matter what happens, before the `Scope` is dropped, it is guaranteed that all the unjoined
/// spawned scoped threads are joined.
///
/// `thread::scope()` returns `Ok(())` if all the unjoined spawned threads and the deferred
/// functions did not panic. It returns `Err(e)` if one of them panics with `e`. If many of them
/// panics, it is still guaranteed that all the threads are joined and all the functions are run,
/// and `thread::scope()` returns `Err(e)` with `e` from a panicking thread or function.
/// `thread::scope()` returns `Ok(())` if all the unjoined spawned threads did not panic. It returns
/// `Err(e)` if one of them panics with `e`. If many of them panic, it is still guaranteed that all
/// the threads are joined, and `thread::scope()` returns `Err(e)` with `e` from a panicking thread.
///
/// # Examples
///
/// Creating and using a scope:
///
/// ```
/// crossbeam_utils::thread::scope(|scope| {
/// scope.defer(|| println!("Exiting scope"));
/// scope.spawn(|| println!("Exiting scope"));
/// scope.spawn(|| println!("Running child thread in scope"));
/// }).unwrap();
/// // Prints messages
/// ```
pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
where
F: FnOnce(&Scope<'env>) -> R,
{
let mut scope = Scope {
dtors: RefCell::new(None),
joins: RefCell::new(Vec::new()),
panics: RefCell::new(Vec::new()),
_marker: PhantomData,
};
let ret = f(&scope);
scope.drop_all()?;
Ok(ret)

// Executes the scoped function. Panic will be catched as `Err`.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));

// Joins all the threads.
scope.drop_all();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, it'd probably make sense to rename drop_all to join_all (nothing is dropped, really - we just join threads).

let panic = scope.panics.borrow_mut().pop();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this line, we could do:

let result = match result {
    Ok(r) => r,
    Err(e) => panic::resume_unwind(e),
};

And return the result as Ok(result) at the end of function.

This way we only catch panics inside spawned threads, not inside the scope itself.

Do you agree with this, or do you think we should catch panics inside the scope, too? I'm personally not 100% sure yet...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I'm also not 100% sure. So let's look at how rayon does: https://docs.rs/rayon/1.0.2/rayon/fn.scope.html#panics "If a panic occurs, either in the closure given to scope() or in any of the spawned jobs, that panic will be propagated and the call to scope() will panic. If multiple panics occurs, it is non-deterministic which of their panic values will propagate. Regardless, once a task is spawned using scope.spawn(), it will execute, even if the spawning task should later panic. scope() returns once all spawned jobs have completed, and any panics are propagated at that point."

Basically rayon doesn't distinguish panics inside scope() and panics of the spawned threads. I'd like to follow the same thing: either (1) propagating a panic, as rayon does, or (2) returning a panic as Err, but not mixing two schemes. We've decided to return thread panics as Err, but we've not released it and we're free to go back to the old behavior (propagating it). An advantage of it is it's behavior/signature is matching with rayon.

What do you think?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, following whatever Rayon does makes sense.

I'll also ask one more person for opinion, prz on IRC, who's been following recent developments in crossbeam-utils.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are a number of differences between scopes in crossbeam and rayon:

  • In crossbeam, spawned threads can return a result. In rayon, they can't.
  • In crossbeam, spawned threads can be manually joined. In rayon, they can't.
  • In crossbeam, panics from spawned threads can be handles using .join(). In rayon, they can't.
  • In crossbeam, panics from unjoined threads can be handled as the Result returned from scope. In rayon, scope simply propagates all panics and returns a T, not Result.

Basically, it looks like in rayon all panics are propagated, while in crossbeam panics tend to be caught and handled manually.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- let's return Result instead of propagating panic. I'm merging it. Thanks!


// If any of the threads panicked, returns the panic's payload.
if let Some(payload) = panic {
return Err(payload);
}

// Returns the result of the scoped function.
result
}

impl<'env> fmt::Debug for Scope<'env> {
Expand All @@ -262,34 +258,17 @@ impl<'env> Scope<'env> {
// and, if any dtor panics, can be resumed in the unwinding this causes. By initially running
// the method outside of any destructor, we avoid any leakage problems due to
// @rust-lang/rust#14875.
fn drop_all(&mut self) -> thread::Result<()> {
let mut ret = Ok(());
while let Some(dtor) = DtorChain::pop(&mut self.dtors.borrow_mut()) {
ret = ret.and(dtor.call_box());
//
// FIXME(jeehoonkang): @rust-lang/rust#14875 is fixed, so maybe we can remove the above comment.
// But I'd like to write tests to check it before removing the comment.
fn drop_all(&mut self) {
let mut joins = self.joins.borrow_mut();
for join in joins.drain(..) {
let result = join.call_box();
if let Err(payload) = result {
self.panics.borrow_mut().push(payload);
}
}
ret
}

fn defer_inner<F>(&self, f: F)
where
F: (FnOnce() -> thread::Result<()>) + 'env,
{
let mut dtors = self.dtors.borrow_mut();
*dtors = Some(DtorChain {
dtor: Box::new(f),
next: dtors.take().map(Box::new),
});
}

/// Schedule code to be executed when exiting the scope.
///
/// This is akin to having a destructor on the stack, except that it is *guaranteed* to be
/// run. It is guaranteed that the function is called after all the spawned threads are joined.
pub fn defer<F>(&self, f: F)
where
F: FnOnce() + 'env,
{
self.defer_inner(move || panic::catch_unwind(AssertUnwindSafe(f)));
}

/// Create a scoped thread.
Expand Down Expand Up @@ -363,14 +342,14 @@ impl<'scope, 'env: 'scope> ScopedThreadBuilder<'scope, 'env> {
let deferred_handle = Rc::new(RefCell::new(Some(join_state)));
let my_handle = deferred_handle.clone();

self.scope.defer_inner(move || {
self.scope.joins.borrow_mut().push(Box::new(move || {
let state = deferred_handle.borrow_mut().deref_mut().take();
if let Some(state) = state {
state.join().map(|_| ())
} else {
Ok(())
}
});
}));

Ok(ScopedJoinHandle {
inner: my_handle,
Expand Down Expand Up @@ -407,9 +386,107 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> {

impl<'env> Drop for Scope<'env> {
fn drop(&mut self) {
// Actually, there should be no deferred functions left to be run.
self.drop_all().unwrap();
// Note that `self.dtors` can be non-empty when the code inside a `scope()` panics and

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs to be updated. self.dtors doesn't exist anymore.

// `drop()` is called in unwinding. Even if it's the case, we will join the unjoined
// threads.
//
// We ignore panics from any threads because we're in course of unwinding anyway.
self.drop_all();
}
}

type ScopedThreadResult<T> = Arc<Mutex<Option<T>>>;

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::{thread, time};

const TIMES: usize = 10;
const SMALL_STACK_SIZE: usize = 20;

#[test]
fn join() {
let counter = AtomicUsize::new(0);
scope(|scope| {
let handle = scope.spawn(|| {
counter.store(1, Ordering::Relaxed);
});
assert!(handle.join().is_ok());

let panic_handle = scope.spawn(|| {
panic!("\"My honey is running out!\", said Pooh.");
});
assert!(panic_handle.join().is_err());
}).unwrap();

// There should be sufficient synchronization.
assert_eq!(1, counter.load(Ordering::Relaxed));
}

#[test]
fn counter() {
let counter = AtomicUsize::new(0);
scope(|scope| {
for _ in 0..TIMES {
scope.spawn(|| {
counter.fetch_add(1, Ordering::Relaxed);
});
}
}).unwrap();

assert_eq!(TIMES, counter.load(Ordering::Relaxed));
}

#[test]
fn counter_builder() {
let counter = AtomicUsize::new(0);
scope(|scope| {
for i in 0..TIMES {
scope
.builder()
.name(format!("child-{}", i))
.stack_size(SMALL_STACK_SIZE)
.spawn(|| {
counter.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
}
}).unwrap();

assert_eq!(TIMES, counter.load(Ordering::Relaxed));
}

#[test]
fn counter_panic() {
let counter = AtomicUsize::new(0);
let result = scope(|scope| {
scope.spawn(|| {
panic!("\"My honey is running out!\", said Pooh.");
});
thread::sleep(time::Duration::from_millis(100));

for _ in 0..TIMES {
scope.spawn(|| {
counter.fetch_add(1, Ordering::Relaxed);
});
}
});

assert_eq!(TIMES, counter.load(Ordering::Relaxed));
assert!(result.is_err());
}

#[test]
fn panic_twice() {
let result = scope(|scope| {
scope.spawn(|| {
panic!();
});
panic!();
});
assert!(result.is_err());
}
}