From f306a9f84a006a6429f485a6c99ac26723f1e1e4 Mon Sep 17 00:00:00 2001 From: Matthew Lugg Date: Wed, 31 Dec 2025 01:10:07 +0000 Subject: [PATCH] std: rebase fixups and cancelation changes This commit includes some API changes which I agreed with Andrew as a follow-up to the recent `Io.Group` changes: * `Io.Group.await` *does* propagate cancelation to group tasks; it then waits for them to complete, and *also* returns `error.Canceled`. The assertion that group tasks handle `error.Canceled` "correctly" means this behavior is loosely analagous to how awaiting a future works. The important thing is that the semantics of `Group.await` and `Future.await` are similar, and `error.Canceled` will always be visible to the caller (assuming correct API usage). * `Io.Group.awaitUncancelable` is removed. * `Future.await` calls `recancel` only if the "child" task (the future being awaited) did not acknowledge cancelation. If it did, then it is assumed that the future will propagate `error.Canceled` through `await` as needed. --- lib/compiler/build_runner.zig | 2 +- lib/std/Build/Fuzz.zig | 4 +- lib/std/Io.zig | 15 ++---- lib/std/Io/Threaded.zig | 96 +++++++++++++++++++++++++++-------- lib/std/Io/Threaded/test.zig | 2 +- lib/std/Io/test.zig | 6 +-- 6 files changed, 86 insertions(+), 39 deletions(-) diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig index 768a203796..f5315d6496 100644 --- a/lib/compiler/build_runner.zig +++ b/lib/compiler/build_runner.zig @@ -849,7 +849,7 @@ fn runStepNames( defer f.deinit(); f.start(); - f.waitAndPrintReport(); + try f.waitAndPrintReport(); } // Every test has a state diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig index 5521f7393b..30e6e81241 100644 --- a/lib/std/Build/Fuzz.zig +++ b/lib/std/Build/Fuzz.zig @@ -513,11 +513,11 @@ fn addEntryPoint(fuzz: *Fuzz, coverage_id: u64, addr: u64) error{ AlreadyReporte try coverage_map.entry_points.append(fuzz.gpa, @intCast(index)); } -pub fn waitAndPrintReport(fuzz: *Fuzz) void { +pub fn waitAndPrintReport(fuzz: *Fuzz) Io.Cancelable!void { assert(fuzz.mode == .limit); const io = fuzz.io; - fuzz.group.awaitUncancelable(io); + try fuzz.group.await(io); fuzz.group = .init; std.debug.print("======= FUZZING REPORT =======\n", .{}); diff --git a/lib/std/Io.zig b/lib/std/Io.zig index fd463b8d55..21fb71286f 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1111,7 +1111,9 @@ pub const Group = struct { } /// Blocks until all tasks of the group finish. During this time, - /// cancelation requests propagate to all members of the group. + /// cancelation requests propagate to all members of the group, and + /// will also cause `error.Canceled` to be returned when the group + /// does ultimately finish. /// /// Idempotent. Not threadsafe. /// @@ -1124,17 +1126,6 @@ pub const Group = struct { assert(g.token.raw == null); } - /// Equivalent to `await` but temporarily blocks cancelation while waiting. - pub fn awaitUncancelable(g: *Group, io: Io) void { - const token = g.token.load(.acquire) orelse return; - const prev = swapCancelProtection(io, .blocked); - defer _ = swapCancelProtection(io, prev); - io.vtable.groupAwait(io.userdata, g, token) catch |err| switch (err) { - error.Canceled => unreachable, - }; - assert(g.token.raw == null); - } - /// Equivalent to `await` but immediately requests cancelation on all /// members of the group. /// diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index fe1d8b07cb..e16e70d6a7 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -182,7 +182,7 @@ const Group = struct { const Task = struct { runnable: Runnable, group: *Io.Group, - func: *const fn (context: *const anyopaque) void, + func: *const fn (context: *const anyopaque) Io.Cancelable!void, context_alignment: Alignment, alloc_len: usize, @@ -192,7 +192,7 @@ const Group = struct { group: Group, context: []const u8, context_alignment: Alignment, - func: *const fn (context: *const anyopaque) void, + func: *const fn (context: *const anyopaque) Io.Cancelable!void, ) Allocator.Error!*Task { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Task); const worst_case_context_offset = context_alignment.forward(@sizeOf(Task) + max_context_misalignment); @@ -247,7 +247,20 @@ const Group = struct { }, .monotonic); } - assertGroupResult(task.func(task.contextPointer())); + const result = task.func(task.contextPointer()); + const cancel_acknowledged = switch (thread.status.load(.monotonic).cancelation) { + .none, .canceling => false, + .canceled => true, + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + }; + if (result) { + assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + } else |err| switch (err) { + error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled + } thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); const old_status = group.status().fetchSub(.{ @@ -348,7 +361,8 @@ const Future = struct { pending_awaited = 0b01, /// Like `pending`, but the future is being canceled. `Future.awaiter` is populated. pending_canceled = 0b11, - /// The future has already completed. `thread` is `null`. + /// The future has already completed. `thread` is `.null`, unless the future terminated + /// with an acknowledged cancel request, in which case `thread` is `.all_ones`. done = 0b10, }, /// When the future begins execution, this is atomically updated from `null` to the thread running the @@ -437,10 +451,18 @@ const Future = struct { future.func(future.contextPointer(), future.resultPointer()); + const had_acknowledged_cancel = switch (thread.status.load(.monotonic).cancelation) { + .none, .canceling => false, + .canceled => true, + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + }; thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); const old_status = future.status.swap(.{ .tag = .done, - .thread = .null, + .thread = if (had_acknowledged_cancel) .all_ones else .null, }, .acq_rel); // acquire `future.awaiter`, release results switch (old_status.tag) { .pending => {}, @@ -1712,11 +1734,11 @@ fn groupAsync( const t: *Threaded = @ptrCast(@alignCast(userdata)); const g: Group = .{ .ptr = type_erased }; - if (builtin.single_threaded) return start(context.ptr) catch unreachable; + if (builtin.single_threaded) return groupAsyncEager(start, context.ptr); const gpa = t.allocator; const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) { - error.OutOfMemory => return t.assertGroupResult(start(context.ptr)), + error.OutOfMemory => return groupAsyncEager(start, context.ptr), }; t.mutex.lock(); @@ -1726,7 +1748,7 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); task.destroy(gpa); - return t.assertGroupResult(start(context.ptr)); + return groupAsyncEager(start, context.ptr); } t.busy_count = busy_count + 1; @@ -1739,7 +1761,7 @@ fn groupAsync( t.busy_count = busy_count; t.mutex.unlock(); task.destroy(gpa); - return t.assertGroupResult(start(context.ptr)); + return groupAsyncEager(start, context.ptr); }; thread.detach(); } @@ -1757,23 +1779,45 @@ fn groupAsync( t.mutex.unlock(); t.cond.signal(); } - -fn assertGroupResult(result: Io.Cancelable!void) void { - const cancel_acknowledged = if (Thread.current) |thread| - switch (thread.status.load(.monotonic).cancelation) { +fn groupAsyncEager( + start: *const fn (context: *const anyopaque) Io.Cancelable!void, + context: *const anyopaque, +) void { + const pre_acknowledged = if (Thread.current) |thread| ack: { + break :ack switch (thread.status.load(.monotonic).cancelation) { .none, .canceling => false, .canceled => true, .parked => unreachable, .blocked => unreachable, .blocked_windows_dns => unreachable, .blocked_canceling => unreachable, - } - else - false; + }; + } else false; + const result = start(context); + const post_acknowledged = if (Thread.current) |thread| ack: { + break :ack switch (thread.status.load(.monotonic).cancelation) { + .none, .canceling => false, + .canceled => true, + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + }; + } else false; + if (result) { - assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + if (pre_acknowledged) { + assert(post_acknowledged); // group task called `recancel` but was not canceled + } else { + assert(!post_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + } } else |err| switch (err) { - error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled + // Don't swallow the cancelation: make it visible to the `Group.async` caller. + error.Canceled => { + assert(!pre_acknowledged); // group task called `recancel` but was not canceled + assert(post_acknowledged); // group task returned `error.Canceled` but was never canceled + recancelInner(); + }, } } @@ -1920,6 +1964,9 @@ fn groupCancel(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *an fn recancel(userdata: ?*anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; + recancelInner(); +} +fn recancelInner() void { const thread = Thread.current.?; // called `recancel` but was not canceled switch (thread.status.fetchXor(.{ .cancelation = @enumFromInt(0b001), @@ -1993,7 +2040,16 @@ fn await( future.waitForCancelWithSignaling(t, &num_completed, null); }, } - recancel(t); + // If the future did not acknowledge the cancelation, we need to mark it outstanding + // for us. Because `future.status.tag == .done`, the information about whether there + // was an acknowledged cancelation is encoded in `future.status.thread`. + const final_status = future.status.load(.monotonic); + assert(final_status.tag == .done); + switch (final_status.thread) { + .null => recancelInner(), // cancelation was not acknowledged, so it's ours + .all_ones => {}, // cancelation was acknowledged, so it was this task's job to propagate it + _ => unreachable, + } }, }, .pending_awaited => unreachable, // `await` raced with `await` @@ -11669,7 +11725,7 @@ fn unlockStderr(userdata: ?*anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); t.stderr_writer.interface.flush() catch |err| switch (err) { error.WriteFailed => switch (t.stderr_writer.err.?) { - error.Canceled => recancel(t), + error.Canceled => recancelInner(), else => {}, }, }; diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index dfe0cf6c85..74bba51b90 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -124,7 +124,7 @@ test "Group.async context alignment" { var group: std.Io.Group = .init; var result: ByteArray512 = undefined; group.async(io, concatByteArraysResultPtr, .{ a, b, &result }); - group.awaitUncancelable(io); + try group.await(io); try std.testing.expectEqualSlices(u8, &expected.x, &result.x); } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 7f298586e4..a467ee1a93 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -194,7 +194,7 @@ test "Group" { group.async(io, count, .{ 1, 10, &results[0] }); group.async(io, count, .{ 20, 30, &results[1] }); - group.awaitUncancelable(io); + try group.await(io); try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results); } @@ -544,9 +544,9 @@ test "tasks spawned in group after Group.cancel are canceled" { group.concurrent(io, blockUntilCanceled, .{io}) catch {}; group.async(io, blockUntilCanceled, .{io}); } - fn blockUntilCanceled(io: Io) void { + fn blockUntilCanceled(io: Io) Io.Cancelable!void { while (true) io.sleep(.fromSeconds(100_000), .awake) catch |err| switch (err) { - error.Canceled => return, + error.Canceled => |e| return e, error.UnsupportedClock => @panic("unsupported clock"), error.Unexpected => @panic("unexpected"), };