diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 0db3087abc..b44cbe8b4e 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1681,28 +1681,29 @@ pub const Condition = struct { .epoch = .init(0), }; - pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { - waitInner(cond, io, mutex, .{ .timeout = .none }) catch |err| switch (err) { - error.Timeout => unreachable, - error.Canceled => return error.Canceled, - }; - } - - pub fn waitTimeout(cond: *Condition, io: Io, mutex: *Mutex, timeout: Timeout) (Cancelable || Timeout.Error)!void { - return waitInner(cond, io, mutex, .{ .timeout = timeout.toDeadline(io) }); - } - - /// Same as `wait`, except does not introduce a cancelation point. + /// Blocks until the condition is signaled or canceled. /// - /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { - waitInner(cond, io, mutex, .uncancelable) catch |err| switch (err) { + /// See also: + /// * `waitUncancelable` + /// * `waitTimeout` + pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { + waitTimeout(cond, io, mutex, .none) catch |err| switch (err) { error.Timeout => unreachable, - error.Canceled => unreachable, + error.Canceled => |e| return e, }; } - fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, mode: union(enum) { uncancelable, timeout: Timeout }) (Cancelable || Timeout.Error)!void { + pub const WaitTimeoutError = Cancelable || Timeout.Error; + + /// Blocks until the condition is signaled, canceled, or the provided + /// timeout expires. + /// + /// See also: + /// * `wait` + /// * `waitUncancelable` + pub fn waitTimeout(cond: *Condition, io: Io, mutex: *Mutex, timeout: Timeout) WaitTimeoutError!void { + const deadline = timeout.toDeadline(io); + var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load { @@ -1714,10 +1715,7 @@ pub const Condition = struct { defer mutex.lockUncancelable(io); while (true) { - const result = switch (mode) { - .uncancelable => io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch), - .timeout => |t| io.futexWaitTimeout(u32, &cond.epoch.raw, epoch, t), - }; + const result = io.futexWaitTimeout(u32, &cond.epoch.raw, epoch, deadline); epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod @@ -1745,13 +1743,54 @@ pub const Condition = struct { assert(prev_state.waiters > 0); // underflow caused by illegal state return err; }; - if (mode == .timeout and mode.timeout != .none) { - if (mode.timeout.deadline.untilNow(io).raw.nanoseconds >= 0) { + switch (deadline) { + .none => {}, + .deadline => |d| if (d.untilNow(io).raw.nanoseconds >= 0) { const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic); assert(prev_state.waiters > 0); // underflow caused by illegal state return error.Timeout; + }, + .duration => unreachable, + } + } + } + + /// Same as `wait`, except does not introduce a cancelation point. + /// + /// See `Future.cancel` for a description of cancelation points. + pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { + var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load + + { + const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters < math.maxInt(u16)); // overflow caused by too many waiters + } + + mutex.unlock(io); + defer mutex.lockUncancelable(io); + + while (true) { + io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch); + + epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod + + // Even on error, try to consume a pending signal first. Otherwise a race might + // cause a signal to get stuck in the state with no corresponding waiter. + { + var prev_state = cond.state.load(.monotonic); + while (prev_state.signals > 0) { + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters - 1, + .signals = prev_state.signals - 1, + }, .acquire, .monotonic) orelse { + // We successfully consumed a signal. + return; + }; } } + + // There are no more signals available; this was a spurious wakeup, + // so we'll loop back to the futex wait. } } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index cd40a7782b..20426acb9c 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -972,7 +972,7 @@ test "Select.cancel with no tasks, no deadlock" { try expectEqual(null, select.cancel()); } -test "Condition" { +test "Condition.waitTimeout" { const io = testing.io; const Context = struct { @@ -1016,3 +1016,46 @@ test "Condition" { try future.await(io); } + +test "Condition.waitUncancelable" { + const io = testing.io; + + const Context = struct { + ready: Io.Event = .unset, + mutex: Io.Mutex = .init, + cond: Io.Condition = .init, + value: u32 = 0, + + fn worker(ctx: *@This()) !void { + defer ctx.ready.set(io); + + try ctx.mutex.lock(io); + defer ctx.mutex.unlock(io); + + try expectEqual(0, ctx.value); + + ctx.ready.set(io); + + ctx.cond.waitUncancelable(io, &ctx.mutex); + + while (ctx.value == 0) try ctx.cond.wait(io, &ctx.mutex); + try expectEqual(1, ctx.value); + } + }; + + var ctx: Context = .{}; + + var future = io.concurrent(Context.worker, .{&ctx}) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + defer future.cancel(io) catch {}; + + try ctx.ready.wait(io); + + try ctx.mutex.lock(io); + ctx.value = 1; + ctx.mutex.unlock(io); + ctx.cond.signal(io); + + try future.await(io); +}