mirror of
https://github.com/rust-lang/rust.git
synced 2026-05-07 09:13:07 +03:00
429 lines
14 KiB
Rust
429 lines
14 KiB
Rust
#![cfg(test)]
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
use std::sync::mpsc::channel;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use crate::{Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder, join};
|
|
|
|
#[test]
|
|
#[should_panic(expected = "Hello, world!")]
|
|
fn panic_propagate() {
|
|
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
|
thread_pool.install(|| {
|
|
panic!("Hello, world!");
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn workers_stop() {
|
|
let registry;
|
|
|
|
{
|
|
// once we exit this block, thread-pool will be dropped
|
|
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
|
|
registry = thread_pool.install(|| {
|
|
// do some work on these threads
|
|
join_a_lot(22);
|
|
|
|
Arc::clone(&thread_pool.registry)
|
|
});
|
|
assert_eq!(registry.num_threads(), 22);
|
|
}
|
|
|
|
// once thread-pool is dropped, registry should terminate, which
|
|
// should lead to worker threads stopping
|
|
registry.wait_until_stopped();
|
|
}
|
|
|
|
fn join_a_lot(n: usize) {
|
|
if n > 0 {
|
|
join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn sleeper_stop() {
|
|
use std::{thread, time};
|
|
|
|
let registry;
|
|
|
|
{
|
|
// once we exit this block, thread-pool will be dropped
|
|
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
|
|
registry = Arc::clone(&thread_pool.registry);
|
|
|
|
// Give time for at least some of the thread pool to fall asleep.
|
|
thread::sleep(time::Duration::from_secs(1));
|
|
}
|
|
|
|
// once thread-pool is dropped, registry should terminate, which
|
|
// should lead to worker threads stopping
|
|
registry.wait_until_stopped();
|
|
}
|
|
|
|
/// Creates a start/exit handler that increments an atomic counter.
|
|
fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
|
|
let count = Arc::new(AtomicUsize::new(0));
|
|
(Arc::clone(&count), move |_| {
|
|
count.fetch_add(1, Ordering::SeqCst);
|
|
})
|
|
}
|
|
|
|
/// Wait until a counter is no longer shared, then return its value.
|
|
fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
|
|
use std::{thread, time};
|
|
|
|
for _ in 0..60 {
|
|
counter = match Arc::try_unwrap(counter) {
|
|
Ok(counter) => return counter.into_inner(),
|
|
Err(counter) => {
|
|
thread::sleep(time::Duration::from_secs(1));
|
|
counter
|
|
}
|
|
};
|
|
}
|
|
|
|
// That's too long!
|
|
panic!("Counter is still shared!");
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn failed_thread_stack() {
|
|
// Note: we first tried to force failure with a `usize::MAX` stack, but
|
|
// macOS and Windows weren't fazed, or at least didn't fail the way we want.
|
|
// They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
|
|
// 2GB stack, so it might not fail until the second thread.
|
|
let stack_size = isize::MAX as usize;
|
|
|
|
let (start_count, start_handler) = count_handler();
|
|
let (exit_count, exit_handler) = count_handler();
|
|
let builder = ThreadPoolBuilder::new()
|
|
.num_threads(10)
|
|
.stack_size(stack_size)
|
|
.start_handler(start_handler)
|
|
.exit_handler(exit_handler);
|
|
|
|
let pool = builder.build();
|
|
assert!(pool.is_err(), "thread stack should have failed!");
|
|
|
|
// With such a huge stack, 64-bit will probably fail on the first thread;
|
|
// 32-bit might manage the first 2GB, but certainly fail the second.
|
|
let start_count = wait_for_counter(start_count);
|
|
assert!(start_count <= 1);
|
|
assert_eq!(start_count, wait_for_counter(exit_count));
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(not(panic = "unwind"), ignore)]
|
|
fn panic_thread_name() {
|
|
let (start_count, start_handler) = count_handler();
|
|
let (exit_count, exit_handler) = count_handler();
|
|
let builder = ThreadPoolBuilder::new()
|
|
.num_threads(10)
|
|
.start_handler(start_handler)
|
|
.exit_handler(exit_handler)
|
|
.thread_name(|i| {
|
|
if i >= 5 {
|
|
panic!();
|
|
}
|
|
format!("panic_thread_name#{}", i)
|
|
});
|
|
|
|
let pool = crate::unwind::halt_unwinding(|| builder.build());
|
|
assert!(pool.is_err(), "thread-name panic should propagate!");
|
|
|
|
// Assuming they're created in order, threads 0 through 4 should have
|
|
// been started already, and then terminated by the panic.
|
|
assert_eq!(5, wait_for_counter(start_count));
|
|
assert_eq!(5, wait_for_counter(exit_count));
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn self_install() {
|
|
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
|
|
// If the inner `install` blocks, then nothing will actually run it!
|
|
assert!(pool.install(|| pool.install(|| true)));
|
|
}
|
|
|
|
// FIXME: We should fix or remove this ignored test.
|
|
#[test]
|
|
#[ignore]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn mutual_install() {
|
|
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
|
|
let ok = pool1.install(|| {
|
|
// This creates a dependency from `pool1` -> `pool2`
|
|
pool2.install(|| {
|
|
// This creates a dependency from `pool2` -> `pool1`
|
|
pool1.install(|| {
|
|
// If they blocked on inter-pool installs, there would be no
|
|
// threads left to run this!
|
|
true
|
|
})
|
|
})
|
|
});
|
|
assert!(ok);
|
|
}
|
|
|
|
// FIXME: We should fix or remove this ignored test.
|
|
#[test]
|
|
#[ignore]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn mutual_install_sleepy() {
|
|
use std::{thread, time};
|
|
|
|
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
|
|
let ok = pool1.install(|| {
|
|
// This creates a dependency from `pool1` -> `pool2`
|
|
pool2.install(|| {
|
|
// Give `pool1` time to fall asleep.
|
|
thread::sleep(time::Duration::from_secs(1));
|
|
|
|
// This creates a dependency from `pool2` -> `pool1`
|
|
pool1.install(|| {
|
|
// Give `pool2` time to fall asleep.
|
|
thread::sleep(time::Duration::from_secs(1));
|
|
|
|
// If they blocked on inter-pool installs, there would be no
|
|
// threads left to run this!
|
|
true
|
|
})
|
|
})
|
|
});
|
|
assert!(ok);
|
|
}
|
|
|
|
#[test]
|
|
#[allow(deprecated)]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn check_thread_pool_new() {
|
|
let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
|
|
assert_eq!(pool.current_num_threads(), 22);
|
|
}
|
|
|
|
macro_rules! test_scope_order {
|
|
($scope:ident => $spawn:ident) => {{
|
|
let builder = ThreadPoolBuilder::new().num_threads(1);
|
|
let pool = builder.build().unwrap();
|
|
pool.install(|| {
|
|
let vec = Mutex::new(vec![]);
|
|
pool.$scope(|scope| {
|
|
let vec = &vec;
|
|
for i in 0..10 {
|
|
scope.$spawn(move |_| {
|
|
vec.lock().unwrap().push(i);
|
|
});
|
|
}
|
|
});
|
|
vec.into_inner().unwrap()
|
|
})
|
|
}};
|
|
}
|
|
|
|
// FIXME: We should fix or remove this ignored test.
|
|
#[test]
|
|
#[ignore]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn scope_lifo_order() {
|
|
let vec = test_scope_order!(scope => spawn);
|
|
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
|
|
assert_eq!(vec, expected);
|
|
}
|
|
|
|
// FIXME: We should fix or remove this ignored test.
|
|
#[test]
|
|
#[ignore]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn scope_fifo_order() {
|
|
let vec = test_scope_order!(scope_fifo => spawn_fifo);
|
|
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
|
|
assert_eq!(vec, expected);
|
|
}
|
|
|
|
macro_rules! test_spawn_order {
|
|
($spawn:ident) => {{
|
|
let builder = ThreadPoolBuilder::new().num_threads(1);
|
|
let pool = &builder.build().unwrap();
|
|
let (tx, rx) = channel();
|
|
pool.install(move || {
|
|
for i in 0..10 {
|
|
let tx = tx.clone();
|
|
pool.$spawn(move || {
|
|
tx.send(i).unwrap();
|
|
});
|
|
}
|
|
});
|
|
rx.iter().collect::<Vec<i32>>()
|
|
}};
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn spawn_lifo_order() {
|
|
let vec = test_spawn_order!(spawn);
|
|
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
|
|
assert_eq!(vec, expected);
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn spawn_fifo_order() {
|
|
let vec = test_spawn_order!(spawn_fifo);
|
|
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
|
|
assert_eq!(vec, expected);
|
|
}
|
|
|
|
// FIXME: We should fix or remove this ignored test.
|
|
#[test]
|
|
#[ignore]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn nested_scopes() {
|
|
// Create matching scopes for every thread pool.
|
|
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
|
|
where
|
|
OP: FnOnce(&[&Scope<'scope>]) + Send,
|
|
{
|
|
if let Some((pool, tail)) = pools.split_first() {
|
|
pool.scope(move |s| {
|
|
// This move reduces the reference lifetimes by variance to match s,
|
|
// but the actual scopes are still tied to the invariant 'scope.
|
|
let mut scopes = scopes;
|
|
scopes.push(s);
|
|
nest(tail, scopes, op)
|
|
})
|
|
} else {
|
|
(op)(&scopes)
|
|
}
|
|
}
|
|
|
|
let pools: Vec<_> =
|
|
(0..10).map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()).collect();
|
|
|
|
let counter = AtomicUsize::new(0);
|
|
nest(&pools, vec![], |scopes| {
|
|
for &s in scopes {
|
|
s.spawn(|_| {
|
|
// Our 'scope lets us borrow the counter in every pool.
|
|
counter.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
}
|
|
});
|
|
assert_eq!(counter.into_inner(), pools.len());
|
|
}
|
|
|
|
// FIXME: We should fix or remove this ignored test.
|
|
#[test]
|
|
#[ignore]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn nested_fifo_scopes() {
|
|
// Create matching fifo scopes for every thread pool.
|
|
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
|
|
where
|
|
OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
|
|
{
|
|
if let Some((pool, tail)) = pools.split_first() {
|
|
pool.scope_fifo(move |s| {
|
|
// This move reduces the reference lifetimes by variance to match s,
|
|
// but the actual scopes are still tied to the invariant 'scope.
|
|
let mut scopes = scopes;
|
|
scopes.push(s);
|
|
nest(tail, scopes, op)
|
|
})
|
|
} else {
|
|
(op)(&scopes)
|
|
}
|
|
}
|
|
|
|
let pools: Vec<_> =
|
|
(0..10).map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()).collect();
|
|
|
|
let counter = AtomicUsize::new(0);
|
|
nest(&pools, vec![], |scopes| {
|
|
for &s in scopes {
|
|
s.spawn_fifo(|_| {
|
|
// Our 'scope lets us borrow the counter in every pool.
|
|
counter.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
}
|
|
});
|
|
assert_eq!(counter.into_inner(), pools.len());
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn in_place_scope_no_deadlock() {
|
|
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
let (tx, rx) = channel();
|
|
let rx_ref = ℞
|
|
pool.in_place_scope(move |s| {
|
|
// With regular scopes this closure would never run because this scope op
|
|
// itself would block the only worker thread.
|
|
s.spawn(move |_| {
|
|
tx.send(()).unwrap();
|
|
});
|
|
rx_ref.recv().unwrap();
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
|
fn in_place_scope_fifo_no_deadlock() {
|
|
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
|
let (tx, rx) = channel();
|
|
let rx_ref = ℞
|
|
pool.in_place_scope_fifo(move |s| {
|
|
// With regular scopes this closure would never run because this scope op
|
|
// itself would block the only worker thread.
|
|
s.spawn_fifo(move |_| {
|
|
tx.send(()).unwrap();
|
|
});
|
|
rx_ref.recv().unwrap();
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn yield_now_to_spawn() {
|
|
let (tx, rx) = channel();
|
|
|
|
// Queue a regular spawn.
|
|
crate::spawn(move || tx.send(22).unwrap());
|
|
|
|
// The single-threaded fallback mode (for wasm etc.) won't
|
|
// get a chance to run the spawn if we never yield to it.
|
|
crate::registry::in_worker(move |_, _| {
|
|
crate::yield_now();
|
|
});
|
|
|
|
// The spawn **must** have started by now, but we still might have to wait
|
|
// for it to finish if a different thread stole it first.
|
|
assert_eq!(22, rx.recv().unwrap());
|
|
}
|
|
|
|
#[test]
|
|
fn yield_local_to_spawn() {
|
|
let (tx, rx) = channel();
|
|
|
|
// Queue a regular spawn.
|
|
crate::spawn(move || tx.send(22).unwrap());
|
|
|
|
// The single-threaded fallback mode (for wasm etc.) won't
|
|
// get a chance to run the spawn if we never yield to it.
|
|
crate::registry::in_worker(move |_, _| {
|
|
crate::yield_local();
|
|
});
|
|
|
|
// The spawn **must** have started by now, but we still might have to wait
|
|
// for it to finish if a different thread stole it first.
|
|
assert_eq!(22, rx.recv().unwrap());
|
|
}
|