Auto merge of #1362 - vakaras:add-sync-primitives-cr1, r=RalfJung

Add sync primitives

This is a follow up PR for https://github.com/rust-lang/miri/pull/1284 that adds support for the missing synchronization primitives.

Sorry for flooding with PRs, but my internship is coming to an end and I need to get things out.

Fixes https://github.com/rust-lang/miri/issues/1419
This commit is contained in:
bors
2020-05-25 07:02:53 +00:00
17 changed files with 1521 additions and 386 deletions
+6
View File
@@ -210,6 +210,12 @@ pub fn eval_main<'tcx>(tcx: TyCtxt<'tcx>, main_id: DefId, config: MiriConfig) ->
SchedulingAction::ExecuteStep => {
assert!(ecx.step()?, "a terminated thread was scheduled for execution");
}
SchedulingAction::ExecuteTimeoutCallback => {
assert!(ecx.machine.communicate,
"scheduler callbacks require disabled isolation, but the code \
that created the callback did not check it");
ecx.run_timeout_callback()?;
}
SchedulingAction::ExecuteDtors => {
// This will either enable the thread again (so we go back
// to `ExecuteStep`), or determine that this thread is done
+5 -1
View File
@@ -31,6 +31,7 @@
mod range_map;
mod shims;
mod stacked_borrows;
mod sync;
mod thread;
// Make all those symbols available in the same place as our own.
@@ -45,7 +46,7 @@
pub use crate::shims::intrinsics::EvalContextExt as IntrinsicsEvalContextExt;
pub use crate::shims::os_str::EvalContextExt as OsStrEvalContextExt;
pub use crate::shims::panic::{CatchUnwindData, EvalContextExt as PanicEvalContextExt};
pub use crate::shims::sync::{EvalContextExt as SyncEvalContextExt};
pub use crate::shims::sync::{EvalContextExt as SyncShimsEvalContextExt};
pub use crate::shims::thread::EvalContextExt as ThreadShimsEvalContextExt;
pub use crate::shims::time::EvalContextExt as TimeEvalContextExt;
pub use crate::shims::tls::{EvalContextExt as TlsEvalContextExt, TlsData};
@@ -70,6 +71,9 @@
pub use crate::thread::{
EvalContextExt as ThreadsEvalContextExt, SchedulingAction, ThreadId, ThreadManager, ThreadState,
};
pub use crate::sync::{
EvalContextExt as SyncEvalContextExt, CondvarId, MutexId, RwLockId
};
/// Insert rustc arguments at the beginning of the argument list that Miri wants to be
/// set per default, for maximal validation power.
+49 -5
View File
@@ -330,6 +330,55 @@ fn emulate_foreign_item_by_name(
let result = this.pthread_rwlock_destroy(rwlock)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_init" => {
let &[attr] = check_arg_count(args)?;
let result = this.pthread_condattr_init(attr)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_setclock" => {
let &[attr, clock_id] = check_arg_count(args)?;
let result = this.pthread_condattr_setclock(attr, clock_id)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_getclock" => {
let &[attr, clock_id] = check_arg_count(args)?;
let result = this.pthread_condattr_getclock(attr, clock_id)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_condattr_destroy" => {
let &[attr] = check_arg_count(args)?;
let result = this.pthread_condattr_destroy(attr)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_init" => {
let &[cond, attr] = check_arg_count(args)?;
let result = this.pthread_cond_init(cond, attr)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_signal" => {
let &[cond] = check_arg_count(args)?;
let result = this.pthread_cond_signal(cond)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_broadcast" => {
let &[cond] = check_arg_count(args)?;
let result = this.pthread_cond_broadcast(cond)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_wait" => {
let &[cond, mutex] = check_arg_count(args)?;
let result = this.pthread_cond_wait(cond, mutex)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
"pthread_cond_timedwait" => {
let &[cond, mutex, abstime] = check_arg_count(args)?;
this.pthread_cond_timedwait(cond, mutex, abstime, dest)?;
}
"pthread_cond_destroy" => {
let &[cond] = check_arg_count(args)?;
let result = this.pthread_cond_destroy(cond)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
}
// Threading
"pthread_create" => {
@@ -391,16 +440,11 @@ fn emulate_foreign_item_by_name(
| "pthread_attr_init"
| "pthread_attr_destroy"
| "pthread_condattr_init"
| "pthread_condattr_destroy"
| "pthread_cond_destroy"
if this.frame().instance.to_string().starts_with("std::sys::unix::") => {
let &[_] = check_arg_count(args)?;
this.write_null(dest)?;
}
| "pthread_cond_init"
| "pthread_attr_setstacksize"
| "pthread_condattr_setclock"
if this.frame().instance.to_string().starts_with("std::sys::unix::") => {
let &[_, _] = check_arg_count(args)?;
this.write_null(dest)?;
+504 -250
View File
@@ -1,8 +1,12 @@
use std::convert::TryInto;
use std::time::{Duration, SystemTime};
use rustc_middle::ty::{layout::TyAndLayout, TyKind, TypeAndMut};
use rustc_target::abi::{LayoutOf, Size};
use crate::stacked_borrows::Tag;
use crate::thread::BlockSetId;
use crate::thread::Time;
use crate::*;
fn assert_ptr_target_min_size<'mir, 'tcx: 'mir>(
@@ -54,8 +58,31 @@ fn set_at_offset<'mir, 'tcx: 'mir>(
// store an i32 in the first four bytes equal to the corresponding libc mutex kind constant
// (e.g. PTHREAD_MUTEX_NORMAL).
/// A flag that allows to distinguish `PTHREAD_MUTEX_NORMAL` from
/// `PTHREAD_MUTEX_DEFAULT`. Since in `glibc` they have the same numeric values,
/// but different behaviour, we need a way to distinguish them. We do this by
/// setting this bit flag to the `PTHREAD_MUTEX_NORMAL` mutexes. See the comment
/// in `pthread_mutexattr_settype` function.
const PTHREAD_MUTEX_NORMAL_FLAG: i32 = 0x8000000;
const PTHREAD_MUTEXATTR_T_MIN_SIZE: u64 = 4;
fn is_mutex_kind_default<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
kind: Scalar<Tag>,
) -> InterpResult<'tcx, bool> {
Ok(kind == ecx.eval_libc("PTHREAD_MUTEX_DEFAULT")?)
}
fn is_mutex_kind_normal<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
kind: Scalar<Tag>,
) -> InterpResult<'tcx, bool> {
let kind = kind.to_i32()?;
let mutex_normal_kind = ecx.eval_libc("PTHREAD_MUTEX_NORMAL")?.to_i32()?;
Ok(kind == (mutex_normal_kind | PTHREAD_MUTEX_NORMAL_FLAG))
}
fn mutexattr_get_kind<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
attr_op: OpTy<'tcx, Tag>,
@@ -76,45 +103,12 @@ fn mutexattr_set_kind<'mir, 'tcx: 'mir>(
// Our chosen memory layout for the emulated mutex (does not have to match the platform layout!):
// bytes 0-3: reserved for signature on macOS
// (need to avoid this because it is set by static initializer macros)
// bytes 4-7: count of how many times this mutex has been locked, as a u32
// bytes 8-11: when count > 0, id of the owner thread as a u32
// bytes 4-7: mutex id as u32 or 0 if id is not assigned yet.
// bytes 12-15 or 16-19 (depending on platform): mutex kind, as an i32
// (the kind has to be at its offset for compatibility with static initializer macros)
// bytes 20-23: when count > 0, id of the blockset in which the blocked threads
// are waiting or 0 if blockset is not yet assigned.
const PTHREAD_MUTEX_T_MIN_SIZE: u64 = 24;
fn mutex_get_locked_count<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, mutex_op, 4, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_set_locked_count<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
locked_count: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, mutex_op, 4, locked_count, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_owner<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, mutex_op, 8, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_set_owner<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
owner: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, mutex_op, 8, owner, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_kind<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
@@ -132,34 +126,34 @@ fn mutex_set_kind<'mir, 'tcx: 'mir>(
set_at_offset(ecx, mutex_op, offset, kind, ecx.machine.layouts.i32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_blockset<'mir, 'tcx: 'mir>(
fn mutex_get_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, mutex_op, 20, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
get_at_offset(ecx, mutex_op, 4, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_set_blockset<'mir, 'tcx: 'mir>(
fn mutex_set_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
blockset: impl Into<ScalarMaybeUninit<Tag>>,
id: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, mutex_op, 20, blockset, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
set_at_offset(ecx, mutex_op, 4, id, ecx.machine.layouts.u32, PTHREAD_MUTEX_T_MIN_SIZE)
}
fn mutex_get_or_create_blockset<'mir, 'tcx: 'mir>(
fn mutex_get_or_create_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, BlockSetId> {
let blockset = mutex_get_blockset(ecx, mutex_op)?.to_u32()?;
if blockset == 0 {
// 0 is a default value and also not a valid blockset id. Need to
// allocate a new blockset.
let blockset = ecx.create_blockset()?;
mutex_set_blockset(ecx, mutex_op, blockset.to_u32_scalar())?;
Ok(blockset)
) -> InterpResult<'tcx, MutexId> {
let id = mutex_get_id(ecx, mutex_op)?.to_u32()?;
if id == 0 {
// 0 is a default value and also not a valid mutex id. Need to allocate
// a new mutex.
let id = ecx.mutex_create();
mutex_set_id(ecx, mutex_op, id.to_u32_scalar())?;
Ok(id)
} else {
Ok(BlockSetId::new(blockset))
Ok(MutexId::from_u32(id))
}
}
@@ -168,107 +162,168 @@ fn mutex_get_or_create_blockset<'mir, 'tcx: 'mir>(
// Our chosen memory layout for the emulated rwlock (does not have to match the platform layout!):
// bytes 0-3: reserved for signature on macOS
// (need to avoid this because it is set by static initializer macros)
// bytes 4-7: reader count, as a u32
// bytes 8-11: writer count, as a u32
// bytes 12-15: when writer or reader count > 0, id of the blockset in which the
// blocked writers are waiting or 0 if blockset is not yet assigned.
// bytes 16-20: when writer count > 0, id of the blockset in which the blocked
// readers are waiting or 0 if blockset is not yet assigned.
// bytes 4-7: rwlock id as u32 or 0 if id is not assigned yet.
const PTHREAD_RWLOCK_T_MIN_SIZE: u64 = 20;
const PTHREAD_RWLOCK_T_MIN_SIZE: u64 = 32;
fn rwlock_get_readers<'mir, 'tcx: 'mir>(
fn rwlock_get_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 4, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_set_readers<'mir, 'tcx: 'mir>(
fn rwlock_set_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
readers: impl Into<ScalarMaybeUninit<Tag>>,
id: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 4, readers, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
set_at_offset(ecx, rwlock_op, 4, id, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_get_writers<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 8, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_set_writers<'mir, 'tcx: 'mir>(
fn rwlock_get_or_create_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
writers: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 8, writers, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_get_writer_blockset<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 12, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_set_writer_blockset<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
blockset: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 12, blockset, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
}
fn rwlock_get_or_create_writer_blockset<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, BlockSetId> {
let blockset = rwlock_get_writer_blockset(ecx, rwlock_op)?.to_u32()?;
if blockset == 0 {
// 0 is a default value and also not a valid blockset id. Need to
// allocate a new blockset.
let blockset = ecx.create_blockset()?;
rwlock_set_writer_blockset(ecx, rwlock_op, blockset.to_u32_scalar())?;
Ok(blockset)
) -> InterpResult<'tcx, RwLockId> {
let id = rwlock_get_id(ecx, rwlock_op)?.to_u32()?;
if id == 0 {
// 0 is a default value and also not a valid rwlock id. Need to allocate
// a new read-write lock.
let id = ecx.rwlock_create();
rwlock_set_id(ecx, rwlock_op, id.to_u32_scalar())?;
Ok(id)
} else {
Ok(BlockSetId::new(blockset))
Ok(RwLockId::from_u32(id))
}
}
fn rwlock_get_reader_blockset<'mir, 'tcx: 'mir>(
// pthread_condattr_t
// Our chosen memory layout for emulation (does not have to match the platform layout!):
// store an i32 in the first four bytes equal to the corresponding libc clock id constant
// (e.g. CLOCK_REALTIME).
const PTHREAD_CONDATTR_T_MIN_SIZE: u64 = 4;
fn condattr_get_clock_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
attr_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, rwlock_op, 16, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
get_at_offset(ecx, attr_op, 0, ecx.machine.layouts.i32, PTHREAD_CONDATTR_T_MIN_SIZE)
}
fn rwlock_set_reader_blockset<'mir, 'tcx: 'mir>(
fn condattr_set_clock_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
blockset: impl Into<ScalarMaybeUninit<Tag>>,
attr_op: OpTy<'tcx, Tag>,
clock_id: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, rwlock_op, 16, blockset, ecx.machine.layouts.u32, PTHREAD_RWLOCK_T_MIN_SIZE)
set_at_offset(ecx, attr_op, 0, clock_id, ecx.machine.layouts.i32, PTHREAD_CONDATTR_T_MIN_SIZE)
}
fn rwlock_get_or_create_reader_blockset<'mir, 'tcx: 'mir>(
// pthread_cond_t
// Our chosen memory layout for the emulated conditional variable (does not have
// to match the platform layout!):
// bytes 0-3: reserved for signature on macOS
// bytes 4-7: the conditional variable id as u32 or 0 if id is not assigned yet.
// bytes 8-11: the clock id constant as i32
const PTHREAD_COND_T_MIN_SIZE: u64 = 12;
fn cond_get_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, cond_op, 4, ecx.machine.layouts.u32, PTHREAD_COND_T_MIN_SIZE)
}
fn cond_set_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
rwlock_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, BlockSetId> {
let blockset = rwlock_get_reader_blockset(ecx, rwlock_op)?.to_u32()?;
if blockset == 0 {
// 0 is a default value and also not a valid blockset id. Need to
// allocate a new blockset.
let blockset = ecx.create_blockset()?;
rwlock_set_reader_blockset(ecx, rwlock_op, blockset.to_u32_scalar())?;
Ok(blockset)
cond_op: OpTy<'tcx, Tag>,
id: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, cond_op, 4, id, ecx.machine.layouts.u32, PTHREAD_COND_T_MIN_SIZE)
}
fn cond_get_or_create_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, CondvarId> {
let id = cond_get_id(ecx, cond_op)?.to_u32()?;
if id == 0 {
// 0 is a default value and also not a valid conditional variable id.
// Need to allocate a new id.
let id = ecx.condvar_create();
cond_set_id(ecx, cond_op, id.to_u32_scalar())?;
Ok(id)
} else {
Ok(BlockSetId::new(blockset))
Ok(CondvarId::from_u32(id))
}
}
fn cond_get_clock_id<'mir, 'tcx: 'mir>(
ecx: &MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
get_at_offset(ecx, cond_op, 8, ecx.machine.layouts.i32, PTHREAD_COND_T_MIN_SIZE)
}
fn cond_set_clock_id<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
cond_op: OpTy<'tcx, Tag>,
clock_id: impl Into<ScalarMaybeUninit<Tag>>,
) -> InterpResult<'tcx, ()> {
set_at_offset(ecx, cond_op, 8, clock_id, ecx.machine.layouts.i32, PTHREAD_COND_T_MIN_SIZE)
}
/// Try to reacquire the mutex associated with the condition variable after we
/// were signaled.
fn reacquire_cond_mutex<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
thread: ThreadId,
mutex: MutexId,
) -> InterpResult<'tcx> {
if ecx.mutex_is_locked(mutex) {
ecx.mutex_enqueue(mutex, thread);
} else {
ecx.mutex_lock(mutex, thread);
ecx.unblock_thread(thread)?;
}
Ok(())
}
/// Reacquire the conditional variable and remove the timeout callback if any
/// was registered.
fn post_cond_signal<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
thread: ThreadId,
mutex: MutexId,
) -> InterpResult<'tcx> {
reacquire_cond_mutex(ecx, thread, mutex)?;
// Waiting for the mutex is not included in the waiting time because we need
// to acquire the mutex always even if we get a timeout.
ecx.unregister_timeout_callback_if_exists(thread)
}
/// Release the mutex associated with the condition variable because we are
/// entering the waiting state.
fn release_cond_mutex<'mir, 'tcx: 'mir>(
ecx: &mut MiriEvalContext<'mir, 'tcx>,
active_thread: ThreadId,
mutex: MutexId,
) -> InterpResult<'tcx> {
if let Some(old_locked_count) = ecx.mutex_unlock(mutex, active_thread)? {
if old_locked_count != 1 {
throw_unsup_format!("awaiting on a lock acquired multiple times is not supported");
}
} else {
throw_ub_format!("awaiting on unlocked or owned by a different thread mutex");
}
ecx.block_thread(active_thread)?;
Ok(())
}
impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
fn pthread_mutexattr_init(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
@@ -288,7 +343,27 @@ fn pthread_mutexattr_settype(
let this = self.eval_context_mut();
let kind = this.read_scalar(kind_op)?.not_undef()?;
if kind == this.eval_libc("PTHREAD_MUTEX_NORMAL")?
if kind == this.eval_libc("PTHREAD_MUTEX_NORMAL")? {
// In `glibc` implementation, the numeric values of
// `PTHREAD_MUTEX_NORMAL` and `PTHREAD_MUTEX_DEFAULT` are equal.
// However, a mutex created by explicitly passing
// `PTHREAD_MUTEX_NORMAL` type has in some cases different behaviour
// from the default mutex for which the type was not explicitly
// specified. For a more detailed discussion, please see
// https://github.com/rust-lang/miri/issues/1419.
//
// To distinguish these two cases in already constructed mutexes, we
// use the same trick as glibc: for the case when
// `pthread_mutexattr_settype` is caled explicitly, we set the
// `PTHREAD_MUTEX_NORMAL_FLAG` flag.
let normal_kind = kind.to_i32()? | PTHREAD_MUTEX_NORMAL_FLAG;
// Check that after setting the flag, the kind is distinguishable
// from all other kinds.
assert_ne!(normal_kind, this.eval_libc("PTHREAD_MUTEX_DEFAULT")?.to_i32()?);
assert_ne!(normal_kind, this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")?.to_i32()?);
assert_ne!(normal_kind, this.eval_libc("PTHREAD_MUTEX_RECURSIVE")?.to_i32()?);
mutexattr_set_kind(this, attr_op, Scalar::from_i32(normal_kind))?;
} else if kind == this.eval_libc("PTHREAD_MUTEX_DEFAULT")?
|| kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")?
|| kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")?
{
@@ -323,7 +398,9 @@ fn pthread_mutex_init(
mutexattr_get_kind(this, attr_op)?.not_undef()?
};
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(0))?;
// Write 0 to use the same code path as the static initializers.
mutex_set_id(this, mutex_op, Scalar::from_i32(0))?;
mutex_set_kind(this, mutex_op, kind)?;
Ok(0)
@@ -333,40 +410,37 @@ fn pthread_mutex_lock(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?.not_undef()?;
let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?;
let id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
if locked_count == 0 {
// The mutex is unlocked. Let's lock it.
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(1))?;
mutex_set_owner(this, mutex_op, active_thread.to_u32_scalar())?;
Ok(0)
} else {
// The mutex is locked. Let's check by whom.
let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into();
if this.mutex_is_locked(id) {
let owner_thread = this.mutex_get_owner(id);
if owner_thread != active_thread {
// Block the active thread.
let blockset = mutex_get_or_create_blockset(this, mutex_op)?;
this.block_active_thread(blockset)?;
this.block_thread(active_thread)?;
this.mutex_enqueue(id, active_thread);
Ok(0)
} else {
// Trying to acquire the same mutex again.
if kind == this.eval_libc("PTHREAD_MUTEX_NORMAL")? {
if is_mutex_kind_default(this, kind)? {
throw_ub_format!("trying to acquire already locked default mutex");
} else if is_mutex_kind_normal(this, kind)? {
throw_machine_stop!(TerminationInfo::Deadlock);
} else if kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")? {
this.eval_libc_i32("EDEADLK")
} else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? {
match locked_count.checked_add(1) {
Some(new_count) => {
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.mutex_lock(id, active_thread);
Ok(0)
} else {
throw_ub_format!("called pthread_mutex_lock on an unsupported type of mutex");
throw_unsup_format!(
"called pthread_mutex_lock on an unsupported type of mutex"
);
}
}
} else {
// The mutex is unlocked. Let's lock it.
this.mutex_lock(id, active_thread);
Ok(0)
}
}
@@ -374,37 +448,32 @@ fn pthread_mutex_trylock(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?.not_undef()?;
let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?;
let id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
if locked_count == 0 {
// The mutex is unlocked. Let's lock it.
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(1))?;
mutex_set_owner(this, mutex_op, active_thread.to_u32_scalar())?;
Ok(0)
} else {
let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into();
if this.mutex_is_locked(id) {
let owner_thread = this.mutex_get_owner(id);
if owner_thread != active_thread {
this.eval_libc_i32("EBUSY")
} else {
if kind == this.eval_libc("PTHREAD_MUTEX_NORMAL")?
if is_mutex_kind_default(this, kind)?
|| is_mutex_kind_normal(this, kind)?
|| kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")?
{
this.eval_libc_i32("EBUSY")
} else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? {
match locked_count.checked_add(1) {
Some(new_count) => {
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.mutex_lock(id, active_thread);
Ok(0)
} else {
throw_ub_format!(
throw_unsup_format!(
"called pthread_mutex_trylock on an unsupported type of mutex"
);
}
}
} else {
// The mutex is unlocked. Let's lock it.
this.mutex_lock(id, active_thread);
Ok(0)
}
}
@@ -412,41 +481,30 @@ fn pthread_mutex_unlock(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'t
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?.not_undef()?;
let locked_count = mutex_get_locked_count(this, mutex_op)?.to_u32()?;
let owner_thread: ThreadId = mutex_get_owner(this, mutex_op)?.to_u32()?.into();
let id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
if owner_thread != this.get_active_thread()? {
throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread");
} else if locked_count == 1 {
let blockset = mutex_get_or_create_blockset(this, mutex_op)?;
if let Some(new_owner) = this.unblock_some_thread(blockset)? {
// We have at least one thread waiting on this mutex. Transfer
// ownership to it.
mutex_set_owner(this, mutex_op, new_owner.to_u32_scalar())?;
} else {
// No thread is waiting on this mutex.
mutex_set_owner(this, mutex_op, Scalar::from_u32(0))?;
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(0))?;
}
if let Some(_old_locked_count) = this.mutex_unlock(id, active_thread)? {
// The mutex was locked by the current thread.
Ok(0)
} else {
if kind == this.eval_libc("PTHREAD_MUTEX_NORMAL")? {
throw_ub_format!("unlocked a PTHREAD_MUTEX_NORMAL mutex that was not locked");
// The mutex was locked by another thread or not locked at all. See
// the “Unlock When Not Owner” column in
// https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_unlock.html.
if is_mutex_kind_default(this, kind)? {
throw_ub_format!(
"unlocked a default mutex that was not locked by the current thread"
);
} else if is_mutex_kind_normal(this, kind)? {
throw_ub_format!(
"unlocked a PTHREAD_MUTEX_NORMAL mutex that was not locked by the current thread"
);
} else if kind == this.eval_libc("PTHREAD_MUTEX_ERRORCHECK")? {
this.eval_libc_i32("EPERM")
} else if kind == this.eval_libc("PTHREAD_MUTEX_RECURSIVE")? {
match locked_count.checked_sub(1) {
Some(new_count) => {
mutex_set_locked_count(this, mutex_op, Scalar::from_u32(new_count))?;
Ok(0)
}
None => {
// locked_count was already zero
this.eval_libc_i32("EPERM")
}
}
this.eval_libc_i32("EPERM")
} else {
throw_ub_format!("called pthread_mutex_unlock on an unsupported type of mutex");
throw_unsup_format!("called pthread_mutex_unlock on an unsupported type of mutex");
}
}
}
@@ -454,13 +512,14 @@ fn pthread_mutex_unlock(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'t
fn pthread_mutex_destroy(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
if mutex_get_locked_count(this, mutex_op)?.to_u32()? != 0 {
let id = mutex_get_or_create_id(this, mutex_op)?;
if this.mutex_is_locked(id) {
throw_ub_format!("destroyed a locked mutex");
}
mutex_set_kind(this, mutex_op, ScalarMaybeUninit::Uninit)?;
mutex_set_locked_count(this, mutex_op, ScalarMaybeUninit::Uninit)?;
mutex_set_blockset(this, mutex_op, ScalarMaybeUninit::Uninit)?;
mutex_set_id(this, mutex_op, ScalarMaybeUninit::Uninit)?;
Ok(0)
}
@@ -468,121 +527,316 @@ fn pthread_mutex_destroy(&mut self, mutex_op: OpTy<'tcx, Tag>) -> InterpResult<'
fn pthread_rwlock_rdlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if writers != 0 {
// The lock is locked by a writer.
assert_eq!(writers, 1);
let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?;
this.block_active_thread(reader_blockset)?;
if this.rwlock_is_write_locked(id) {
this.rwlock_enqueue_and_block_reader(id, active_thread)?;
Ok(0)
} else {
match readers.checked_add(1) {
Some(new_readers) => {
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.rwlock_reader_lock(id, active_thread);
Ok(0)
}
}
fn pthread_rwlock_tryrdlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
if writers != 0 {
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_is_write_locked(id) {
this.eval_libc_i32("EBUSY")
} else {
match readers.checked_add(1) {
Some(new_readers) => {
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
Ok(0)
}
None => this.eval_libc_i32("EAGAIN"),
}
this.rwlock_reader_lock(id, active_thread);
Ok(0)
}
}
fn pthread_rwlock_wrlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
let writer_blockset = rwlock_get_or_create_writer_blockset(this, rwlock_op)?;
if readers != 0 || writers != 0 {
this.block_active_thread(writer_blockset)?;
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_is_locked(id) {
// Note: this will deadlock if the lock is already locked by this
// thread in any way.
//
// Relevant documentation:
// https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_wrlock.html
// An in depth discussion on this topic:
// https://github.com/rust-lang/rust/issues/53127
//
// FIXME: Detect and report the deadlock proactively. (We currently
// report the deadlock only when no thread can continue execution,
// but we could detect that this lock is already locked and report
// an error.)
this.rwlock_enqueue_and_block_writer(id, active_thread)?;
} else {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
this.rwlock_writer_lock(id, active_thread);
}
Ok(0)
}
fn pthread_rwlock_trywrlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
if readers != 0 || writers != 0 {
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_is_locked(id) {
this.eval_libc_i32("EBUSY")
} else {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
this.rwlock_writer_lock(id, active_thread);
Ok(0)
}
}
// FIXME: We should check that this lock was locked by the active thread.
fn pthread_rwlock_unlock(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let readers = rwlock_get_readers(this, rwlock_op)?.to_u32()?;
let writers = rwlock_get_writers(this, rwlock_op)?.to_u32()?;
let writer_blockset = rwlock_get_or_create_writer_blockset(this, rwlock_op)?;
if let Some(new_readers) = readers.checked_sub(1) {
assert_eq!(writers, 0);
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
if new_readers == 0 {
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
let id = rwlock_get_or_create_id(this, rwlock_op)?;
let active_thread = this.get_active_thread()?;
if this.rwlock_reader_unlock(id, active_thread) {
// The thread was a reader.
if this.rwlock_is_locked(id) {
// No more readers owning the lock. Give it to a writer if there
// is any.
if let Some(writer) = this.rwlock_dequeue_writer(id) {
this.unblock_thread(writer)?;
this.rwlock_writer_lock(id, writer);
}
}
Ok(0)
} else if writers != 0 {
let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?;
} else if Some(active_thread) == this.rwlock_writer_unlock(id) {
// The thread was a writer.
//
// We are prioritizing writers here against the readers. As a
// result, not only readers can starve writers, but also writers can
// starve readers.
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
assert_eq!(writers, 1);
if let Some(writer) = this.rwlock_dequeue_writer(id) {
// Give the lock to another writer.
this.unblock_thread(writer)?;
this.rwlock_writer_lock(id, writer);
} else {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(0))?;
let mut readers = 0;
while let Some(_reader) = this.unblock_some_thread(reader_blockset)? {
readers += 1;
// Give the lock to all readers.
while let Some(reader) = this.rwlock_dequeue_reader(id) {
this.unblock_thread(reader)?;
this.rwlock_reader_lock(id, reader);
}
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(readers))?
}
Ok(0)
} else {
throw_ub_format!("unlocked an rwlock that was not locked");
throw_ub_format!("unlocked an rwlock that was not locked by the active thread");
}
}
fn pthread_rwlock_destroy(&mut self, rwlock_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
if rwlock_get_readers(this, rwlock_op)?.to_u32()? != 0
|| rwlock_get_writers(this, rwlock_op)?.to_u32()? != 0
{
let id = rwlock_get_or_create_id(this, rwlock_op)?;
if this.rwlock_is_locked(id) {
throw_ub_format!("destroyed a locked rwlock");
}
rwlock_set_readers(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_writers(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_reader_blockset(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_writer_blockset(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
rwlock_set_id(this, rwlock_op, ScalarMaybeUninit::Uninit)?;
Ok(0)
}
fn pthread_condattr_init(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
// The default value of the clock attribute shall refer to the system
// clock.
// https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_condattr_setclock.html
let default_clock_id = this.eval_libc("CLOCK_REALTIME")?;
condattr_set_clock_id(this, attr_op, default_clock_id)?;
Ok(0)
}
fn pthread_condattr_setclock(
&mut self,
attr_op: OpTy<'tcx, Tag>,
clock_id_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let clock_id = this.read_scalar(clock_id_op)?.not_undef()?;
if clock_id == this.eval_libc("CLOCK_REALTIME")?
|| clock_id == this.eval_libc("CLOCK_MONOTONIC")?
{
condattr_set_clock_id(this, attr_op, clock_id)?;
} else {
let einval = this.eval_libc_i32("EINVAL")?;
return Ok(einval);
}
Ok(0)
}
fn pthread_condattr_getclock(
&mut self,
attr_op: OpTy<'tcx, Tag>,
clk_id_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let clock_id = condattr_get_clock_id(this, attr_op)?;
this.write_scalar(clock_id, this.deref_operand(clk_id_op)?.into())?;
Ok(0)
}
fn pthread_condattr_destroy(&mut self, attr_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
condattr_set_clock_id(this, attr_op, ScalarMaybeUninit::Uninit)?;
Ok(0)
}
fn pthread_cond_init(
&mut self,
cond_op: OpTy<'tcx, Tag>,
attr_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let attr = this.read_scalar(attr_op)?.not_undef()?;
let clock_id = if this.is_null(attr)? {
this.eval_libc("CLOCK_REALTIME")?
} else {
condattr_get_clock_id(this, attr_op)?.not_undef()?
};
// Write 0 to use the same code path as the static initializers.
cond_set_id(this, cond_op, Scalar::from_i32(0))?;
cond_set_clock_id(this, cond_op, clock_id)?;
Ok(0)
}
fn pthread_cond_signal(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
if let Some((thread, mutex)) = this.condvar_signal(id) {
post_cond_signal(this, thread, mutex)?;
}
Ok(0)
}
fn pthread_cond_broadcast(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
while let Some((thread, mutex)) = this.condvar_signal(id) {
post_cond_signal(this, thread, mutex)?;
}
Ok(0)
}
fn pthread_cond_wait(
&mut self,
cond_op: OpTy<'tcx, Tag>,
mutex_op: OpTy<'tcx, Tag>,
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
let mutex_id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
release_cond_mutex(this, active_thread, mutex_id)?;
this.condvar_wait(id, active_thread, mutex_id);
Ok(0)
}
fn pthread_cond_timedwait(
&mut self,
cond_op: OpTy<'tcx, Tag>,
mutex_op: OpTy<'tcx, Tag>,
abstime_op: OpTy<'tcx, Tag>,
dest: PlaceTy<'tcx, Tag>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.check_no_isolation("pthread_cond_timedwait")?;
let id = cond_get_or_create_id(this, cond_op)?;
let mutex_id = mutex_get_or_create_id(this, mutex_op)?;
let active_thread = this.get_active_thread()?;
release_cond_mutex(this, active_thread, mutex_id)?;
this.condvar_wait(id, active_thread, mutex_id);
// We return success for now and override it in the timeout callback.
this.write_scalar(Scalar::from_i32(0), dest)?;
// Extract the timeout.
let clock_id = cond_get_clock_id(this, cond_op)?.to_i32()?;
let duration = {
let tp = this.deref_operand(abstime_op)?;
let seconds_place = this.mplace_field(tp, 0)?;
let seconds = this.read_scalar(seconds_place.into())?;
let nanoseconds_place = this.mplace_field(tp, 1)?;
let nanoseconds = this.read_scalar(nanoseconds_place.into())?;
let (seconds, nanoseconds) = (
seconds.to_machine_usize(this)?,
nanoseconds.to_machine_usize(this)?.try_into().unwrap(),
);
Duration::new(seconds, nanoseconds)
};
let timeout_time = if clock_id == this.eval_libc_i32("CLOCK_REALTIME")? {
Time::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
} else if clock_id == this.eval_libc_i32("CLOCK_MONOTONIC")? {
Time::Monotonic(this.machine.time_anchor.checked_add(duration).unwrap())
} else {
throw_unsup_format!("unsupported clock id: {}", clock_id);
};
// Register the timeout callback.
this.register_timeout_callback(
active_thread,
timeout_time,
Box::new(move |ecx| {
// We are not waiting for the condvar any more, wait for the
// mutex instead.
reacquire_cond_mutex(ecx, active_thread, mutex_id)?;
// Remove the thread from the conditional variable.
ecx.condvar_remove_waiter(id, active_thread);
// Set the return value: we timed out.
let timeout = ecx.eval_libc_i32("ETIMEDOUT")?;
ecx.write_scalar(Scalar::from_i32(timeout), dest)?;
Ok(())
}),
)?;
Ok(())
}
fn pthread_cond_destroy(&mut self, cond_op: OpTy<'tcx, Tag>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_or_create_id(this, cond_op)?;
if this.condvar_is_awaited(id) {
throw_ub_format!("destroyed an awaited conditional variable");
}
cond_set_id(this, cond_op, ScalarMaybeUninit::Uninit)?;
cond_set_clock_id(this, cond_op, ScalarMaybeUninit::Uninit)?;
Ok(0)
}
+344
View File
@@ -0,0 +1,344 @@
use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::convert::TryFrom;
use std::num::NonZeroU32;
use std::ops::Not;
use rustc_index::vec::{Idx, IndexVec};
use crate::*;
/// We cannot use the `newtype_index!` macro because we have to use 0 as a
/// sentinel value meaning that the identifier is not assigned. This is because
/// the pthreads static initializers initialize memory with zeros (see the
/// `src/shims/sync.rs` file).
macro_rules! declare_id {
($name: ident) => {
/// 0 is used to indicate that the id was not yet assigned and,
/// therefore, is not a valid identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct $name(NonZeroU32);
impl $name {
// Panics if `id == 0`.
pub fn from_u32(id: u32) -> Self {
Self(NonZeroU32::new(id).unwrap())
}
}
impl Idx for $name {
fn new(idx: usize) -> Self {
// We use 0 as a sentinel value (see the comment above) and,
// therefore, need to shift by one when converting from an index
// into a vector.
let shifted_idx = u32::try_from(idx).unwrap().checked_add(1).unwrap();
$name(NonZeroU32::new(shifted_idx).unwrap())
}
fn index(self) -> usize {
// See the comment in `Self::new`.
// (This cannot underflow because self is NonZeroU32.)
usize::try_from(self.0.get() - 1).unwrap()
}
}
impl $name {
pub fn to_u32_scalar<'tcx>(&self) -> Scalar<Tag> {
Scalar::from_u32(self.0.get())
}
}
};
}
declare_id!(MutexId);
/// The mutex state.
#[derive(Default, Debug)]
struct Mutex {
/// The thread that currently owns the lock.
owner: Option<ThreadId>,
/// How many times the mutex was locked by the owner.
lock_count: usize,
/// The queue of threads waiting for this mutex.
queue: VecDeque<ThreadId>,
}
declare_id!(RwLockId);
/// The read-write lock state.
#[derive(Default, Debug)]
struct RwLock {
/// The writer thread that currently owns the lock.
writer: Option<ThreadId>,
/// The readers that currently own the lock and how many times they acquired
/// the lock.
readers: HashMap<ThreadId, usize>,
/// The queue of writer threads waiting for this lock.
writer_queue: VecDeque<ThreadId>,
/// The queue of reader threads waiting for this lock.
reader_queue: VecDeque<ThreadId>,
}
declare_id!(CondvarId);
/// A thread waiting on a conditional variable.
#[derive(Debug)]
struct CondvarWaiter {
/// The thread that is waiting on this variable.
thread: ThreadId,
/// The mutex on which the thread is waiting.
mutex: MutexId,
}
/// The conditional variable state.
#[derive(Default, Debug)]
struct Condvar {
waiters: VecDeque<CondvarWaiter>,
}
/// The state of all synchronization variables.
#[derive(Default, Debug)]
pub(super) struct SynchronizationState {
mutexes: IndexVec<MutexId, Mutex>,
rwlocks: IndexVec<RwLockId, RwLock>,
condvars: IndexVec<CondvarId, Condvar>,
}
// Public interface to synchronization primitives. Please note that in most
// cases, the function calls are infallible and it is the client's (shim
// implementation's) responsibility to detect and deal with erroneous
// situations.
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
#[inline]
/// Create state for a new mutex.
fn mutex_create(&mut self) -> MutexId {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes.push(Default::default())
}
#[inline]
/// Get the id of the thread that currently owns this lock.
fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
let this = self.eval_context_ref();
this.machine.threads.sync.mutexes[id].owner.unwrap()
}
#[inline]
/// Check if locked.
fn mutex_is_locked(&mut self, id: MutexId) -> bool {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes[id].owner.is_some()
}
/// Lock by setting the mutex owner and increasing the lock count.
fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) {
let this = self.eval_context_mut();
let mutex = &mut this.machine.threads.sync.mutexes[id];
if let Some(current_owner) = mutex.owner {
assert_eq!(thread, current_owner, "mutex already locked by another thread");
assert!(
mutex.lock_count > 0,
"invariant violation: lock_count == 0 iff the thread is unlocked"
);
} else {
mutex.owner = Some(thread);
}
mutex.lock_count = mutex.lock_count.checked_add(1).unwrap();
}
/// Try unlocking by decreasing the lock count and returning the old owner
/// and the old lock count. If the lock count reaches 0, release the lock
/// and potentially give to a new owner. If the lock was not locked, return
/// `None`.
///
/// Note: It is the caller's responsibility to check that the thread that
/// unlocked the lock actually is the same one, which owned it.
fn mutex_unlock(
&mut self,
id: MutexId,
expected_owner: ThreadId,
) -> InterpResult<'tcx, Option<usize>> {
let this = self.eval_context_mut();
let mutex = &mut this.machine.threads.sync.mutexes[id];
if let Some(current_owner) = mutex.owner {
// Mutex is locked.
if current_owner != expected_owner {
// Only the owner can unlock the mutex.
return Ok(None);
}
let old_lock_count = mutex.lock_count;
mutex.lock_count = old_lock_count
.checked_sub(1)
.expect("invariant violation: lock_count == 0 iff the thread is unlocked");
if mutex.lock_count == 0 {
mutex.owner = None;
// The mutex is completely unlocked. Try transfering ownership
// to another thread.
if let Some(new_owner) = this.mutex_dequeue(id) {
this.mutex_lock(id, new_owner);
this.unblock_thread(new_owner)?;
}
}
Ok(Some(old_lock_count))
} else {
// Mutex is unlocked.
Ok(None)
}
}
#[inline]
/// Put the thread into the queue waiting for the lock.
fn mutex_enqueue(&mut self, id: MutexId, thread: ThreadId) {
let this = self.eval_context_mut();
assert!(this.mutex_is_locked(id), "queing on unlocked mutex");
this.machine.threads.sync.mutexes[id].queue.push_back(thread);
}
#[inline]
/// Take a thread out of the queue waiting for the lock.
fn mutex_dequeue(&mut self, id: MutexId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.mutexes[id].queue.pop_front()
}
#[inline]
/// Create state for a new read write lock.
fn rwlock_create(&mut self) -> RwLockId {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks.push(Default::default())
}
#[inline]
/// Check if locked.
fn rwlock_is_locked(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer.is_some()
|| this.machine.threads.sync.rwlocks[id].readers.is_empty().not()
}
#[inline]
/// Check if write locked.
fn rwlock_is_write_locked(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer.is_some()
}
/// Read-lock the lock by adding the `reader` the list of threads that own
/// this lock.
fn rwlock_reader_lock(&mut self, id: RwLockId, reader: ThreadId) {
let this = self.eval_context_mut();
assert!(!this.rwlock_is_write_locked(id), "the lock is write locked");
let count = this.machine.threads.sync.rwlocks[id].readers.entry(reader).or_insert(0);
*count = count.checked_add(1).expect("the reader counter overflowed");
}
/// Try read-unlock the lock for `reader`. Returns `true` if succeeded,
/// `false` if this `reader` did not hold the lock.
fn rwlock_reader_unlock(&mut self, id: RwLockId, reader: ThreadId) -> bool {
let this = self.eval_context_mut();
match this.machine.threads.sync.rwlocks[id].readers.entry(reader) {
Entry::Occupied(mut entry) => {
let count = entry.get_mut();
*count -= 1;
if *count == 0 {
entry.remove();
}
true
}
Entry::Vacant(_) => false,
}
}
#[inline]
/// Put the reader in the queue waiting for the lock and block it.
fn rwlock_enqueue_and_block_reader(
&mut self,
id: RwLockId,
reader: ThreadId,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
assert!(this.rwlock_is_write_locked(id), "queueing on not write locked lock");
this.machine.threads.sync.rwlocks[id].reader_queue.push_back(reader);
this.block_thread(reader)
}
#[inline]
/// Take a reader out the queue waiting for the lock.
fn rwlock_dequeue_reader(&mut self, id: RwLockId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].reader_queue.pop_front()
}
#[inline]
/// Lock by setting the writer that owns the lock.
fn rwlock_writer_lock(&mut self, id: RwLockId, writer: ThreadId) {
let this = self.eval_context_mut();
assert!(!this.rwlock_is_locked(id), "the lock is already locked");
this.machine.threads.sync.rwlocks[id].writer = Some(writer);
}
#[inline]
/// Try to unlock by removing the writer.
fn rwlock_writer_unlock(&mut self, id: RwLockId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer.take()
}
#[inline]
/// Put the writer in the queue waiting for the lock.
fn rwlock_enqueue_and_block_writer(
&mut self,
id: RwLockId,
writer: ThreadId,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
assert!(this.rwlock_is_locked(id), "queueing on unlocked lock");
this.machine.threads.sync.rwlocks[id].writer_queue.push_back(writer);
this.block_thread(writer)
}
#[inline]
/// Take the writer out the queue waiting for the lock.
fn rwlock_dequeue_writer(&mut self, id: RwLockId) -> Option<ThreadId> {
let this = self.eval_context_mut();
this.machine.threads.sync.rwlocks[id].writer_queue.pop_front()
}
#[inline]
/// Create state for a new conditional variable.
fn condvar_create(&mut self) -> CondvarId {
let this = self.eval_context_mut();
this.machine.threads.sync.condvars.push(Default::default())
}
#[inline]
/// Is the conditional variable awaited?
fn condvar_is_awaited(&mut self, id: CondvarId) -> bool {
let this = self.eval_context_mut();
!this.machine.threads.sync.condvars[id].waiters.is_empty()
}
/// Mark that the thread is waiting on the conditional variable.
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, mutex: MutexId) {
let this = self.eval_context_mut();
let waiters = &mut this.machine.threads.sync.condvars[id].waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(CondvarWaiter { thread, mutex });
}
/// Wake up some thread (if there is any) sleeping on the conditional
/// variable.
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> {
let this = self.eval_context_mut();
this.machine.threads.sync.condvars[id]
.waiters
.pop_front()
.map(|waiter| (waiter.thread, waiter.mutex))
}
#[inline]
/// Remove the thread from the queue of threads waiting on this conditional variable.
fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) {
let this = self.eval_context_mut();
this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
}
}
+164 -53
View File
@@ -1,8 +1,10 @@
//! Implements threads.
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::convert::TryFrom;
use std::num::{NonZeroU32, TryFromIntError};
use std::num::TryFromIntError;
use std::time::{Duration, Instant, SystemTime};
use log::trace;
@@ -15,18 +17,26 @@
ty::{self, Instance},
};
use crate::sync::SynchronizationState;
use crate::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SchedulingAction {
/// Execute step on the active thread.
ExecuteStep,
/// Execute a timeout callback.
ExecuteTimeoutCallback,
/// Execute destructors of the active thread.
ExecuteDtors,
/// Stop the program.
Stop,
}
/// Timeout callbacks can be created by synchronization primitives to tell the
/// scheduler that they should be called once some period of time passes.
type TimeoutCallback<'mir, 'tcx> =
Box<dyn FnOnce(&mut InterpCx<'mir, 'tcx, Evaluator<'mir, 'tcx>>) -> InterpResult<'tcx> + 'tcx>;
/// A thread identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct ThreadId(u32);
@@ -69,21 +79,6 @@ pub fn to_u32_scalar<'tcx>(&self) -> Scalar<Tag> {
}
}
/// An identifier of a set of blocked threads. 0 is used to indicate the absence
/// of a blockset identifier and, therefore, is not a valid identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct BlockSetId(NonZeroU32);
impl BlockSetId {
/// Panics if `id` is 0.
pub fn new(id: u32) -> Self {
Self(NonZeroU32::new(id).expect("0 is not a valid blockset id"))
}
pub fn to_u32_scalar<'tcx>(&self) -> Scalar<Tag> {
Scalar::from_u32(self.0.get())
}
}
/// The state of a thread.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ThreadState {
@@ -92,8 +87,10 @@ pub enum ThreadState {
/// The thread tried to join the specified thread and is blocked until that
/// thread terminates.
BlockedOnJoin(ThreadId),
/// The thread is blocked and belongs to the given blockset.
Blocked(BlockSetId),
/// The thread is blocked on some synchronization primitive. It is the
/// responsibility of the synchronization primitives to track threads that
/// are blocked by them.
BlockedOnSync,
/// The thread has terminated its execution (we do not delete terminated
/// threads).
Terminated,
@@ -162,6 +159,41 @@ fn default() -> Self {
}
}
/// A specific moment in time.
#[derive(Debug)]
pub enum Time {
Monotonic(Instant),
RealTime(SystemTime),
}
impl Time {
/// How long do we have to wait from now until the specified time?
fn get_wait_time(&self) -> Duration {
match self {
Time::Monotonic(instant) => instant.saturating_duration_since(Instant::now()),
Time::RealTime(time) =>
time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
}
}
}
/// Callbacks are used to implement timeouts. For example, waiting on a
/// conditional variable with a timeout creates a callback that is called after
/// the specified time and unblocks the thread. If another thread signals on the
/// conditional variable, the signal handler deletes the callback.
struct TimeoutCallbackInfo<'mir, 'tcx> {
/// The callback should be called no earlier than this time.
call_time: Time,
/// The called function.
callback: TimeoutCallback<'mir, 'tcx>,
}
impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TimeoutCallback({:?})", self.call_time)
}
}
/// A set of threads.
#[derive(Debug)]
pub struct ThreadManager<'mir, 'tcx> {
@@ -171,13 +203,16 @@ pub struct ThreadManager<'mir, 'tcx> {
///
/// Note that this vector also contains terminated threads.
threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
/// A counter used to generate unique identifiers for blocksets.
blockset_counter: u32,
/// This field is pub(crate) because the synchronization primitives
/// (`crate::sync`) need a way to access it.
pub(crate) sync: SynchronizationState,
/// A mapping from a thread-local static to an allocation id of a thread
/// specific allocation.
thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), AllocId>>,
/// A flag that indicates that we should change the active thread.
yield_active_thread: bool,
/// Callbacks that are called once the specified time passes.
timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>,
}
impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
@@ -191,9 +226,10 @@ fn default() -> Self {
Self {
active_thread: ThreadId::new(0),
threads: threads,
blockset_counter: 0,
sync: SynchronizationState::default(),
thread_local_alloc_ids: Default::default(),
yield_active_thread: false,
timeout_callbacks: FxHashMap::default(),
}
}
}
@@ -321,30 +357,18 @@ fn get_thread_name(&self) -> &[u8] {
self.active_thread_ref().thread_name()
}
/// Allocate a new blockset id.
fn create_blockset(&mut self) -> BlockSetId {
self.blockset_counter = self.blockset_counter.checked_add(1).unwrap();
BlockSetId::new(self.blockset_counter)
}
/// Block the currently active thread and put it into the given blockset.
fn block_active_thread(&mut self, set: BlockSetId) {
let state = &mut self.active_thread_mut().state;
/// Put the thread into the blocked state.
fn block_thread(&mut self, thread: ThreadId) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::Enabled);
*state = ThreadState::Blocked(set);
*state = ThreadState::BlockedOnSync;
}
/// Unblock any one thread from the given blockset if it contains at least
/// one. Return the id of the unblocked thread.
fn unblock_some_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
for (id, thread) in self.threads.iter_enumerated_mut() {
if thread.state == ThreadState::Blocked(set) {
trace!("unblocking {:?} in blockset {:?}", id, set);
thread.state = ThreadState::Enabled;
return Some(id);
}
}
None
/// Put the blocked thread into the enabled state.
fn unblock_thread(&mut self, thread: ThreadId) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::BlockedOnSync);
*state = ThreadState::Enabled;
}
/// Change the active thread to some enabled thread.
@@ -352,6 +376,42 @@ fn yield_active_thread(&mut self) {
self.yield_active_thread = true;
}
/// Register the given `callback` to be called once the `call_time` passes.
///
/// The callback will be called with `thread` being the active thread, and
/// the callback may not change the active thread.
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: Time,
callback: TimeoutCallback<'mir, 'tcx>,
) {
self.timeout_callbacks
.insert(thread, TimeoutCallbackInfo { call_time, callback })
.unwrap_none();
}
/// Unregister the callback for the `thread`.
fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
self.timeout_callbacks.remove(&thread);
}
/// Get a callback that is ready to be called.
fn get_ready_callback(&mut self) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> {
// We iterate over all threads in the order of their indices because
// this allows us to have a deterministic scheduler.
for thread in self.threads.indices() {
match self.timeout_callbacks.entry(thread) {
Entry::Occupied(entry) =>
if entry.get().call_time.get_wait_time() == Duration::new(0, 0) {
return Some((thread, entry.remove().callback));
},
Entry::Vacant(_) => {}
}
}
None
}
/// Decide which action to take next and on which thread.
///
/// The currently implemented scheduling policy is the one that is commonly
@@ -385,6 +445,20 @@ fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
}
return Ok(SchedulingAction::Stop);
}
// At least for `pthread_cond_timedwait` we need to report timeout when
// the function is called already after the specified time even if a
// signal is received before the thread gets scheduled. Therefore, we
// need to schedule all timeout callbacks before we continue regular
// execution.
//
// Documentation:
// https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html#
let potential_sleep_time =
self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time()).min();
if potential_sleep_time == Some(Duration::new(0, 0)) {
return Ok(SchedulingAction::ExecuteTimeoutCallback);
}
// No callbacks scheduled, pick a regular thread to execute.
if self.threads[self.active_thread].state == ThreadState::Enabled
&& !self.yield_active_thread
{
@@ -406,7 +480,13 @@ fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
}
// We have not found a thread to execute.
if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
unreachable!();
unreachable!("all threads terminated without the main thread terminating?!");
} else if let Some(sleep_time) = potential_sleep_time {
// All threads are currently blocked, but we have unexecuted
// timeout_callbacks, which may unblock some of the threads. Hence,
// sleep until the first callback.
std::thread::sleep(sleep_time);
Ok(SchedulingAction::ExecuteTimeoutCallback)
} else {
throw_machine_stop!(TerminationInfo::Deadlock);
}
@@ -577,21 +657,15 @@ fn get_active_thread_name<'c>(&'c self) -> InterpResult<'tcx, &'c [u8]>
}
#[inline]
fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> {
fn block_thread(&mut self, thread: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
Ok(this.machine.threads.create_blockset())
Ok(this.machine.threads.block_thread(thread))
}
#[inline]
fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> {
fn unblock_thread(&mut self, thread: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
Ok(this.machine.threads.block_active_thread(set))
}
#[inline]
fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
let this = self.eval_context_mut();
Ok(this.machine.threads.unblock_some_thread(set))
Ok(this.machine.threads.unblock_thread(thread))
}
#[inline]
@@ -601,6 +675,43 @@ fn yield_active_thread(&mut self) -> InterpResult<'tcx> {
Ok(())
}
#[inline]
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: Time,
callback: TimeoutCallback<'mir, 'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.register_timeout_callback(thread, call_time, callback);
Ok(())
}
#[inline]
fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.unregister_timeout_callback_if_exists(thread);
Ok(())
}
/// Execute a timeout callback on the callback's thread.
#[inline]
fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let (thread, callback) =
this.machine.threads.get_ready_callback().expect("no callback found");
// This back-and-forth with `set_active_thread` is here because of two
// design decisions:
// 1. Make the caller and not the callback responsible for changing
// thread.
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.set_active_thread(thread)?;
callback(this)?;
this.set_active_thread(old_thread)?;
Ok(())
}
/// Decide which action to take next and on which thread.
#[inline]
fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
@@ -0,0 +1,16 @@
// ignore-windows: No libc on Windows
//
// Check that if we pass NULL attribute, then we get the default mutex type.
#![feature(rustc_private)]
extern crate libc;
fn main() {
unsafe {
let mut mutex: libc::pthread_mutex_t = std::mem::zeroed();
assert_eq!(libc::pthread_mutex_init(&mut mutex as *mut _, std::ptr::null() as *const _), 0);
assert_eq!(libc::pthread_mutex_lock(&mut mutex as *mut _), 0);
libc::pthread_mutex_lock(&mut mutex as *mut _); //~ ERROR Undefined Behavior: trying to acquire already locked default mutex
}
}
@@ -0,0 +1,17 @@
// ignore-windows: No libc on Windows
//
// Check that if we do not set the mutex type, it is the default.
#![feature(rustc_private)]
extern crate libc;
fn main() {
unsafe {
let mutexattr: libc::pthread_mutexattr_t = std::mem::zeroed();
let mut mutex: libc::pthread_mutex_t = std::mem::zeroed();
assert_eq!(libc::pthread_mutex_init(&mut mutex as *mut _, &mutexattr as *const _), 0);
assert_eq!(libc::pthread_mutex_lock(&mut mutex as *mut _), 0);
libc::pthread_mutex_lock(&mut mutex as *mut _); //~ ERROR Undefined Behavior: trying to acquire already locked default mutex
}
}
@@ -11,6 +11,6 @@ fn main() {
let mut mutex: libc::pthread_mutex_t = std::mem::zeroed();
assert_eq!(libc::pthread_mutex_init(&mut mutex as *mut _, &mutexattr as *const _), 0);
assert_eq!(libc::pthread_mutex_lock(&mut mutex as *mut _), 0);
libc::pthread_mutex_lock(&mut mutex as *mut _); //~ ERROR deadlock
libc::pthread_mutex_lock(&mut mutex as *mut _); //~ ERROR deadlock: the evaluated program deadlocked
}
}
@@ -24,7 +24,7 @@ fn main() {
let lock_copy = lock.clone();
thread::spawn(move || {
assert_eq!(libc::pthread_mutex_unlock(lock_copy.0.get() as *mut _), 0); //~ ERROR: Undefined Behavior: called pthread_mutex_unlock on a mutex owned by another thread
assert_eq!(libc::pthread_mutex_unlock(lock_copy.0.get() as *mut _), 0); //~ ERROR: Undefined Behavior: unlocked a default mutex that was not locked by the current thread
})
.join()
.unwrap();
@@ -0,0 +1,32 @@
// ignore-windows: No libc on Windows
#![feature(rustc_private)]
extern crate libc;
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::thread;
struct RwLock(UnsafeCell<libc::pthread_rwlock_t>);
unsafe impl Send for RwLock {}
unsafe impl Sync for RwLock {}
fn new_lock() -> Arc<RwLock> {
Arc::new(RwLock(UnsafeCell::new(libc::PTHREAD_RWLOCK_INITIALIZER)))
}
fn main() {
unsafe {
let lock = new_lock();
assert_eq!(libc::pthread_rwlock_rdlock(lock.0.get() as *mut _), 0);
let lock_copy = lock.clone();
thread::spawn(move || {
assert_eq!(libc::pthread_rwlock_unlock(lock_copy.0.get() as *mut _), 0); //~ ERROR: Undefined Behavior: unlocked an rwlock that was not locked by the active thread
})
.join()
.unwrap();
}
}
@@ -0,0 +1,32 @@
// ignore-windows: No libc on Windows
#![feature(rustc_private)]
extern crate libc;
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::thread;
struct RwLock(UnsafeCell<libc::pthread_rwlock_t>);
unsafe impl Send for RwLock {}
unsafe impl Sync for RwLock {}
fn new_lock() -> Arc<RwLock> {
Arc::new(RwLock(UnsafeCell::new(libc::PTHREAD_RWLOCK_INITIALIZER)))
}
fn main() {
unsafe {
let lock = new_lock();
assert_eq!(libc::pthread_rwlock_wrlock(lock.0.get() as *mut _), 0);
let lock_copy = lock.clone();
thread::spawn(move || {
assert_eq!(libc::pthread_rwlock_unlock(lock_copy.0.get() as *mut _), 0); //~ ERROR: Undefined Behavior: unlocked an rwlock that was not locked by the active thread
})
.join()
.unwrap();
}
}
@@ -0,0 +1,47 @@
// ignore-windows: No libc on Windows
// ignore-macos: pthread_condattr_setclock is not supported on MacOS.
// compile-flags: -Zmiri-disable-isolation
#![feature(rustc_private)]
/// Test that conditional variable timeouts are working properly with both
/// monotonic and system clocks.
extern crate libc;
use std::mem;
use std::time::Instant;
fn test_timed_wait_timeout(clock_id: i32) {
unsafe {
let mut attr: libc::pthread_condattr_t = mem::zeroed();
assert_eq!(libc::pthread_condattr_init(&mut attr as *mut _), 0);
assert_eq!(libc::pthread_condattr_setclock(&mut attr as *mut _, clock_id), 0);
let mut cond: libc::pthread_cond_t = mem::zeroed();
assert_eq!(libc::pthread_cond_init(&mut cond as *mut _, &attr as *const _), 0);
assert_eq!(libc::pthread_condattr_destroy(&mut attr as *mut _), 0);
let mut mutex: libc::pthread_mutex_t = mem::zeroed();
let mut now: libc::timespec = mem::zeroed();
assert_eq!(libc::clock_gettime(clock_id, &mut now), 0);
let timeout = libc::timespec { tv_sec: now.tv_sec + 1, tv_nsec: now.tv_nsec };
assert_eq!(libc::pthread_mutex_lock(&mut mutex as *mut _), 0);
let current_time = Instant::now();
assert_eq!(
libc::pthread_cond_timedwait(&mut cond as *mut _, &mut mutex as *mut _, &timeout),
libc::ETIMEDOUT
);
let elapsed_time = current_time.elapsed().as_millis();
assert!(900 <= elapsed_time && elapsed_time <= 1300);
assert_eq!(libc::pthread_mutex_unlock(&mut mutex as *mut _), 0);
assert_eq!(libc::pthread_mutex_destroy(&mut mutex as *mut _), 0);
assert_eq!(libc::pthread_cond_destroy(&mut cond as *mut _), 0);
}
}
fn main() {
test_timed_wait_timeout(libc::CLOCK_MONOTONIC);
test_timed_wait_timeout(libc::CLOCK_REALTIME);
}
-75
View File
@@ -1,75 +0,0 @@
// ignore-windows: Concurrency on Windows is not supported yet.
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
fn check_mutex() {
let data = Arc::new(Mutex::new(0));
let mut threads = Vec::new();
for _ in 0..3 {
let data = Arc::clone(&data);
let thread = thread::spawn(move || {
let mut data = data.lock().unwrap();
thread::yield_now();
*data += 1;
});
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
}
assert!(data.try_lock().is_ok());
let data = Arc::try_unwrap(data).unwrap().into_inner().unwrap();
assert_eq!(data, 3);
}
fn check_rwlock_write() {
let data = Arc::new(RwLock::new(0));
let mut threads = Vec::new();
for _ in 0..3 {
let data = Arc::clone(&data);
let thread = thread::spawn(move || {
let mut data = data.write().unwrap();
thread::yield_now();
*data += 1;
});
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
}
assert!(data.try_write().is_ok());
let data = Arc::try_unwrap(data).unwrap().into_inner().unwrap();
assert_eq!(data, 3);
}
fn check_rwlock_read_no_deadlock() {
let l1 = Arc::new(RwLock::new(0));
let l2 = Arc::new(RwLock::new(0));
let l1_copy = Arc::clone(&l1);
let l2_copy = Arc::clone(&l2);
let _guard1 = l1.read().unwrap();
let handle = thread::spawn(move || {
let _guard2 = l2_copy.read().unwrap();
thread::yield_now();
let _guard1 = l1_copy.read().unwrap();
});
thread::yield_now();
let _guard2 = l2.read().unwrap();
handle.join().unwrap();
}
fn main() {
check_mutex();
check_rwlock_write();
check_rwlock_read_no_deadlock();
}
+283
View File
@@ -0,0 +1,283 @@
// ignore-windows: Concurrency on Windows is not supported yet.
// compile-flags: -Zmiri-disable-isolation
use std::sync::mpsc::{channel, sync_channel};
use std::sync::{Arc, Barrier, Condvar, Mutex, Once, RwLock};
use std::thread;
use std::time::{Duration, Instant};
// Check if Rust barriers are working.
/// This test is taken from the Rust documentation.
fn check_barriers() {
let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
let c = barrier.clone();
// The same messages will be printed together.
// You will NOT see any interleaving.
handles.push(thread::spawn(move || {
println!("before wait");
c.wait();
println!("after wait");
}));
}
// Wait for other threads to finish.
for handle in handles {
handle.join().unwrap();
}
}
// Check if Rust conditional variables are working.
/// The test taken from the Rust documentation.
fn check_conditional_variables_notify_one() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
// Spawn a new thread.
thread::spawn(move || {
thread::yield_now();
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to fully start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}
fn check_conditional_variables_notify_all() {
let pair = Arc::new(((Mutex::new(())), Condvar::new()));
// Spawn threads and block them on the conditional variable.
let handles: Vec<_> = (0..5)
.map(|_| {
let pair2 = pair.clone();
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let guard = lock.lock().unwrap();
// Block waiting on the conditional variable.
let _ = cvar.wait(guard).unwrap();
})
})
.inspect(|_| {
thread::yield_now();
thread::yield_now();
})
.collect();
let (_, cvar) = &*pair;
// Unblock all threads.
cvar.notify_all();
for handle in handles {
handle.join().unwrap();
}
}
/// Test that waiting on a conditional variable with a timeout does not
/// deadlock.
fn check_conditional_variables_timed_wait_timeout() {
let lock = Mutex::new(());
let cvar = Condvar::new();
let guard = lock.lock().unwrap();
let now = Instant::now();
let (_guard, timeout) = cvar.wait_timeout(guard, Duration::from_millis(100)).unwrap();
assert!(timeout.timed_out());
let elapsed_time = now.elapsed().as_millis();
assert!(100 <= elapsed_time && elapsed_time <= 300);
}
/// Test that signaling a conditional variable when waiting with a timeout works
/// as expected.
fn check_conditional_variables_timed_wait_notimeout() {
let pair = Arc::new((Mutex::new(()), Condvar::new()));
let pair2 = pair.clone();
let (lock, cvar) = &*pair;
let guard = lock.lock().unwrap();
let handle = thread::spawn(move || {
let (_lock, cvar) = &*pair2;
cvar.notify_one();
});
let (_guard, timeout) = cvar.wait_timeout(guard, Duration::from_millis(100)).unwrap();
assert!(!timeout.timed_out());
handle.join().unwrap();
}
// Check if locks are working.
fn check_mutex() {
let data = Arc::new(Mutex::new(0));
let mut threads = Vec::new();
for _ in 0..3 {
let data = Arc::clone(&data);
let thread = thread::spawn(move || {
let mut data = data.lock().unwrap();
thread::yield_now();
*data += 1;
});
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
}
assert!(data.try_lock().is_ok());
let data = Arc::try_unwrap(data).unwrap().into_inner().unwrap();
assert_eq!(data, 3);
}
fn check_rwlock_write() {
let data = Arc::new(RwLock::new(0));
let mut threads = Vec::new();
for _ in 0..3 {
let data = Arc::clone(&data);
let thread = thread::spawn(move || {
let mut data = data.write().unwrap();
thread::yield_now();
*data += 1;
});
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
}
assert!(data.try_write().is_ok());
let data = Arc::try_unwrap(data).unwrap().into_inner().unwrap();
assert_eq!(data, 3);
}
fn check_rwlock_read_no_deadlock() {
let l1 = Arc::new(RwLock::new(0));
let l2 = Arc::new(RwLock::new(0));
let l1_copy = Arc::clone(&l1);
let l2_copy = Arc::clone(&l2);
let _guard1 = l1.read().unwrap();
let handle = thread::spawn(move || {
let _guard2 = l2_copy.read().unwrap();
thread::yield_now();
let _guard1 = l1_copy.read().unwrap();
});
thread::yield_now();
let _guard2 = l2.read().unwrap();
handle.join().unwrap();
}
// Check if channels are working.
/// The test taken from the Rust documentation.
fn simple_send() {
let (tx, rx) = channel();
thread::spawn(move || {
tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);
}
/// The test taken from the Rust documentation.
fn multiple_send() {
let (tx, rx) = channel();
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
tx.send(i).unwrap();
});
}
let mut sum = 0;
for _ in 0..10 {
let j = rx.recv().unwrap();
assert!(0 <= j && j < 10);
sum += j;
}
assert_eq!(sum, 45);
}
/// The test taken from the Rust documentation.
fn send_on_sync() {
let (sender, receiver) = sync_channel(1);
// this returns immediately
sender.send(1).unwrap();
thread::spawn(move || {
// this will block until the previous message has been received
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}
// Check if Rust once statics are working.
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn get_cached_val() -> usize {
unsafe {
INIT.call_once(|| {
VAL = expensive_computation();
});
VAL
}
}
fn expensive_computation() -> usize {
let mut i = 1;
let mut c = 1;
while i < 1000 {
i *= c;
c += 1;
}
i
}
/// The test taken from the Rust documentation.
fn check_once() {
let handles: Vec<_> = (0..10)
.map(|_| {
thread::spawn(|| {
thread::yield_now();
let val = get_cached_val();
assert_eq!(val, 5040);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
fn main() {
check_barriers();
check_conditional_variables_notify_one();
check_conditional_variables_notify_all();
check_conditional_variables_timed_wait_timeout();
check_conditional_variables_timed_wait_notimeout();
check_mutex();
check_rwlock_write();
check_rwlock_read_no_deadlock();
simple_send();
multiple_send();
send_on_sync();
check_once();
}
+20
View File
@@ -0,0 +1,20 @@
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait