From d821446cf92b9b8974ec4a3e5d7be883d1741b4e Mon Sep 17 00:00:00 2001 From: Lukas Lalinsky Date: Thu, 19 Feb 2026 12:07:54 +0100 Subject: [PATCH] Implement `Condition.waitTimeout` I'd have preferred if `vtable.futexWait` returned `error.Timeout`, since all the OS-level APIs provide it. However, if I keep the vtable untouched, I had to determine the timeout case by post-checking the deadline. It's fine functionally, but one extra syscall that be avoided at cost of changing the vtable and all the futex implementations. --- lib/std/Io.zig | 34 +++++++++++++++++++++++++--------- lib/std/Io/test.zig | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 03b33dae06..0db3087abc 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1682,19 +1682,27 @@ pub const Condition = struct { }; pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { - try waitInner(cond, io, mutex, false); + waitInner(cond, io, mutex, .{ .timeout = .none }) catch |err| switch (err) { + error.Timeout => unreachable, + error.Canceled => return error.Canceled, + }; + } + + pub fn waitTimeout(cond: *Condition, io: Io, mutex: *Mutex, timeout: Timeout) (Cancelable || Timeout.Error)!void { + return waitInner(cond, io, mutex, .{ .timeout = timeout.toDeadline(io) }); } /// Same as `wait`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { - waitInner(cond, io, mutex, true) catch |err| switch (err) { + waitInner(cond, io, mutex, .uncancelable) catch |err| switch (err) { + error.Timeout => unreachable, error.Canceled => unreachable, }; } - fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, uncancelable: bool) Cancelable!void { + fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, mode: union(enum) { uncancelable, timeout: Timeout }) (Cancelable || Timeout.Error)!void { var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load { @@ -1706,10 +1714,10 @@ pub const Condition = struct { defer mutex.lockUncancelable(io); while (true) { - const result = if (uncancelable) - io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch) - else - io.futexWait(u32, &cond.epoch.raw, epoch); + const result = switch (mode) { + .uncancelable => io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch), + .timeout => |t| io.futexWaitTimeout(u32, &cond.epoch.raw, epoch, t), + }; epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod @@ -1729,13 +1737,21 @@ pub const Condition = struct { } // There are no more signals available; this was a spurious wakeup or an error. If it - // was an error, we will remove ourselves as a waiter and return that error. Otherwise, - // we'll loop back to the futex wait. + // was an error, we will remove ourselves as a waiter and return that error. If a + // timeout was specified and the deadline has passed, we remove ourselves as a waiter + // and return `error.Timeout`. Otherwise, we'll loop back to the futex wait. result catch |err| { const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic); assert(prev_state.waiters > 0); // underflow caused by illegal state return err; }; + if (mode == .timeout and mode.timeout != .none) { + if (mode.timeout.deadline.untilNow(io).raw.nanoseconds >= 0) { + const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters > 0); // underflow caused by illegal state + return error.Timeout; + } + } } } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index bd6b433d05..37c6b78c39 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -971,3 +971,47 @@ test "Select.cancel with no tasks, no deadlock" { var select: Io.Select(U) = .init(io, &.{}); try expectEqual(null, select.cancel()); } + +test "Condition" { + if (builtin.single_threaded) return error.SkipZigTest; + const io = testing.io; + + const TestContext = struct { + ready: Io.Event = .unset, + mutex: Io.Mutex = .init, + cond: Io.Condition = .init, + value: u32 = 0, + + fn worker(ctx: *@This()) !void { + defer ctx.ready.set(io); + + try ctx.mutex.lock(io); + defer ctx.mutex.unlock(io); + + try expectError(error.Timeout, ctx.cond.waitTimeout(io, &ctx.mutex, .{ .duration = .{ + .raw = .fromMilliseconds(1), + .clock = .awake, + } })); + try expectEqual(0, ctx.value); + + ctx.ready.set(io); + + while (ctx.value == 0) try ctx.cond.wait(io, &ctx.mutex); + try expectEqual(1, ctx.value); + } + }; + + var ctx: TestContext = .{}; + + var future = try io.concurrent(TestContext.worker, .{&ctx}); + defer future.cancel(io) catch {}; + + try ctx.ready.wait(io); + + try ctx.mutex.lock(io); + ctx.value = 1; + ctx.mutex.unlock(io); + ctx.cond.signal(io); + + try future.await(io); +}