std.Io.Condition: separate wait impls for clarity

also:
* add docs
* add test coverage for waitUncancelable
* explicit error set declaration WaitTimeoutError
This commit is contained in:
Andrew Kelley
2026-04-16 14:51:25 -07:00
parent 078185a54b
commit c0763b5e25
2 changed files with 106 additions and 24 deletions
+62 -23
View File
@@ -1681,28 +1681,29 @@ pub const Condition = struct {
.epoch = .init(0),
};
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
waitInner(cond, io, mutex, .{ .timeout = .none }) catch |err| switch (err) {
error.Timeout => unreachable,
error.Canceled => return error.Canceled,
};
}
pub fn waitTimeout(cond: *Condition, io: Io, mutex: *Mutex, timeout: Timeout) (Cancelable || Timeout.Error)!void {
return waitInner(cond, io, mutex, .{ .timeout = timeout.toDeadline(io) });
}
/// Same as `wait`, except does not introduce a cancelation point.
/// Blocks until the condition is signaled or canceled.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
waitInner(cond, io, mutex, .uncancelable) catch |err| switch (err) {
/// See also:
/// * `waitUncancelable`
/// * `waitTimeout`
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
waitTimeout(cond, io, mutex, .none) catch |err| switch (err) {
error.Timeout => unreachable,
error.Canceled => unreachable,
error.Canceled => |e| return e,
};
}
fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, mode: union(enum) { uncancelable, timeout: Timeout }) (Cancelable || Timeout.Error)!void {
pub const WaitTimeoutError = Cancelable || Timeout.Error;
/// Blocks until the condition is signaled, canceled, or the provided
/// timeout expires.
///
/// See also:
/// * `wait`
/// * `waitUncancelable`
pub fn waitTimeout(cond: *Condition, io: Io, mutex: *Mutex, timeout: Timeout) WaitTimeoutError!void {
const deadline = timeout.toDeadline(io);
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
{
@@ -1714,10 +1715,7 @@ pub const Condition = struct {
defer mutex.lockUncancelable(io);
while (true) {
const result = switch (mode) {
.uncancelable => io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch),
.timeout => |t| io.futexWaitTimeout(u32, &cond.epoch.raw, epoch, t),
};
const result = io.futexWaitTimeout(u32, &cond.epoch.raw, epoch, deadline);
epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod
@@ -1745,13 +1743,54 @@ pub const Condition = struct {
assert(prev_state.waiters > 0); // underflow caused by illegal state
return err;
};
if (mode == .timeout and mode.timeout != .none) {
if (mode.timeout.deadline.untilNow(io).raw.nanoseconds >= 0) {
switch (deadline) {
.none => {},
.deadline => |d| if (d.untilNow(io).raw.nanoseconds >= 0) {
const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic);
assert(prev_state.waiters > 0); // underflow caused by illegal state
return error.Timeout;
},
.duration => unreachable,
}
}
}
/// Same as `wait`, except does not introduce a cancelation point.
///
/// See `Future.cancel` for a description of cancelation points.
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
{
const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic);
assert(prev_state.waiters < math.maxInt(u16)); // overflow caused by too many waiters
}
mutex.unlock(io);
defer mutex.lockUncancelable(io);
while (true) {
io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch);
epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod
// Even on error, try to consume a pending signal first. Otherwise a race might
// cause a signal to get stuck in the state with no corresponding waiter.
{
var prev_state = cond.state.load(.monotonic);
while (prev_state.signals > 0) {
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters - 1,
.signals = prev_state.signals - 1,
}, .acquire, .monotonic) orelse {
// We successfully consumed a signal.
return;
};
}
}
// There are no more signals available; this was a spurious wakeup,
// so we'll loop back to the futex wait.
}
}
+44 -1
View File
@@ -972,7 +972,7 @@ test "Select.cancel with no tasks, no deadlock" {
try expectEqual(null, select.cancel());
}
test "Condition" {
test "Condition.waitTimeout" {
const io = testing.io;
const Context = struct {
@@ -1016,3 +1016,46 @@ test "Condition" {
try future.await(io);
}
test "Condition.waitUncancelable" {
const io = testing.io;
const Context = struct {
ready: Io.Event = .unset,
mutex: Io.Mutex = .init,
cond: Io.Condition = .init,
value: u32 = 0,
fn worker(ctx: *@This()) !void {
defer ctx.ready.set(io);
try ctx.mutex.lock(io);
defer ctx.mutex.unlock(io);
try expectEqual(0, ctx.value);
ctx.ready.set(io);
ctx.cond.waitUncancelable(io, &ctx.mutex);
while (ctx.value == 0) try ctx.cond.wait(io, &ctx.mutex);
try expectEqual(1, ctx.value);
}
};
var ctx: Context = .{};
var future = io.concurrent(Context.worker, .{&ctx}) catch |err| switch (err) {
error.ConcurrencyUnavailable => return error.SkipZigTest,
};
defer future.cancel(io) catch {};
try ctx.ready.wait(io);
try ctx.mutex.lock(io);
ctx.value = 1;
ctx.mutex.unlock(io);
ctx.cond.signal(io);
try future.await(io);
}