std: rebase fixups and cancelation changes

This commit includes some API changes which I agreed with Andrew as a
follow-up to the recent `Io.Group` changes:

* `Io.Group.await` *does* propagate cancelation to group tasks; it then
  waits for them to complete, and *also* returns `error.Canceled`. The
  assertion that group tasks handle `error.Canceled` "correctly" means
  this behavior is loosely analagous to how awaiting a future works. The
  important thing is that the semantics of `Group.await` and
  `Future.await` are similar, and `error.Canceled` will always be
  visible to the caller (assuming correct API usage).

* `Io.Group.awaitUncancelable` is removed.

* `Future.await` calls `recancel` only if the "child" task (the future
  being awaited) did not acknowledge cancelation. If it did, then it is
  assumed that the future will propagate `error.Canceled` through
  `await` as needed.
This commit is contained in:
Matthew Lugg
2025-12-31 01:10:07 +00:00
parent b8a09bcbd9
commit f306a9f84a
6 changed files with 86 additions and 39 deletions
+1 -1
View File
@@ -849,7 +849,7 @@ fn runStepNames(
defer f.deinit();
f.start();
f.waitAndPrintReport();
try f.waitAndPrintReport();
}
// Every test has a state
+2 -2
View File
@@ -513,11 +513,11 @@ fn addEntryPoint(fuzz: *Fuzz, coverage_id: u64, addr: u64) error{ AlreadyReporte
try coverage_map.entry_points.append(fuzz.gpa, @intCast(index));
}
pub fn waitAndPrintReport(fuzz: *Fuzz) void {
pub fn waitAndPrintReport(fuzz: *Fuzz) Io.Cancelable!void {
assert(fuzz.mode == .limit);
const io = fuzz.io;
fuzz.group.awaitUncancelable(io);
try fuzz.group.await(io);
fuzz.group = .init;
std.debug.print("======= FUZZING REPORT =======\n", .{});
+3 -12
View File
@@ -1111,7 +1111,9 @@ pub const Group = struct {
}
/// Blocks until all tasks of the group finish. During this time,
/// cancelation requests propagate to all members of the group.
/// cancelation requests propagate to all members of the group, and
/// will also cause `error.Canceled` to be returned when the group
/// does ultimately finish.
///
/// Idempotent. Not threadsafe.
///
@@ -1124,17 +1126,6 @@ pub const Group = struct {
assert(g.token.raw == null);
}
/// Equivalent to `await` but temporarily blocks cancelation while waiting.
pub fn awaitUncancelable(g: *Group, io: Io) void {
const token = g.token.load(.acquire) orelse return;
const prev = swapCancelProtection(io, .blocked);
defer _ = swapCancelProtection(io, prev);
io.vtable.groupAwait(io.userdata, g, token) catch |err| switch (err) {
error.Canceled => unreachable,
};
assert(g.token.raw == null);
}
/// Equivalent to `await` but immediately requests cancelation on all
/// members of the group.
///
+76 -20
View File
@@ -182,7 +182,7 @@ const Group = struct {
const Task = struct {
runnable: Runnable,
group: *Io.Group,
func: *const fn (context: *const anyopaque) void,
func: *const fn (context: *const anyopaque) Io.Cancelable!void,
context_alignment: Alignment,
alloc_len: usize,
@@ -192,7 +192,7 @@ const Group = struct {
group: Group,
context: []const u8,
context_alignment: Alignment,
func: *const fn (context: *const anyopaque) void,
func: *const fn (context: *const anyopaque) Io.Cancelable!void,
) Allocator.Error!*Task {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Task);
const worst_case_context_offset = context_alignment.forward(@sizeOf(Task) + max_context_misalignment);
@@ -247,7 +247,20 @@ const Group = struct {
}, .monotonic);
}
assertGroupResult(task.func(task.contextPointer()));
const result = task.func(task.contextPointer());
const cancel_acknowledged = switch (thread.status.load(.monotonic).cancelation) {
.none, .canceling => false,
.canceled => true,
.parked => unreachable,
.blocked => unreachable,
.blocked_windows_dns => unreachable,
.blocked_canceling => unreachable,
};
if (result) {
assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
} else |err| switch (err) {
error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled
}
thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic);
const old_status = group.status().fetchSub(.{
@@ -348,7 +361,8 @@ const Future = struct {
pending_awaited = 0b01,
/// Like `pending`, but the future is being canceled. `Future.awaiter` is populated.
pending_canceled = 0b11,
/// The future has already completed. `thread` is `null`.
/// The future has already completed. `thread` is `.null`, unless the future terminated
/// with an acknowledged cancel request, in which case `thread` is `.all_ones`.
done = 0b10,
},
/// When the future begins execution, this is atomically updated from `null` to the thread running the
@@ -437,10 +451,18 @@ const Future = struct {
future.func(future.contextPointer(), future.resultPointer());
const had_acknowledged_cancel = switch (thread.status.load(.monotonic).cancelation) {
.none, .canceling => false,
.canceled => true,
.parked => unreachable,
.blocked => unreachable,
.blocked_windows_dns => unreachable,
.blocked_canceling => unreachable,
};
thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic);
const old_status = future.status.swap(.{
.tag = .done,
.thread = .null,
.thread = if (had_acknowledged_cancel) .all_ones else .null,
}, .acq_rel); // acquire `future.awaiter`, release results
switch (old_status.tag) {
.pending => {},
@@ -1712,11 +1734,11 @@ fn groupAsync(
const t: *Threaded = @ptrCast(@alignCast(userdata));
const g: Group = .{ .ptr = type_erased };
if (builtin.single_threaded) return start(context.ptr) catch unreachable;
if (builtin.single_threaded) return groupAsyncEager(start, context.ptr);
const gpa = t.allocator;
const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) {
error.OutOfMemory => return t.assertGroupResult(start(context.ptr)),
error.OutOfMemory => return groupAsyncEager(start, context.ptr),
};
t.mutex.lock();
@@ -1726,7 +1748,7 @@ fn groupAsync(
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
task.destroy(gpa);
return t.assertGroupResult(start(context.ptr));
return groupAsyncEager(start, context.ptr);
}
t.busy_count = busy_count + 1;
@@ -1739,7 +1761,7 @@ fn groupAsync(
t.busy_count = busy_count;
t.mutex.unlock();
task.destroy(gpa);
return t.assertGroupResult(start(context.ptr));
return groupAsyncEager(start, context.ptr);
};
thread.detach();
}
@@ -1757,23 +1779,45 @@ fn groupAsync(
t.mutex.unlock();
t.cond.signal();
}
fn assertGroupResult(result: Io.Cancelable!void) void {
const cancel_acknowledged = if (Thread.current) |thread|
switch (thread.status.load(.monotonic).cancelation) {
fn groupAsyncEager(
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
context: *const anyopaque,
) void {
const pre_acknowledged = if (Thread.current) |thread| ack: {
break :ack switch (thread.status.load(.monotonic).cancelation) {
.none, .canceling => false,
.canceled => true,
.parked => unreachable,
.blocked => unreachable,
.blocked_windows_dns => unreachable,
.blocked_canceling => unreachable,
}
else
false;
};
} else false;
const result = start(context);
const post_acknowledged = if (Thread.current) |thread| ack: {
break :ack switch (thread.status.load(.monotonic).cancelation) {
.none, .canceling => false,
.canceled => true,
.parked => unreachable,
.blocked => unreachable,
.blocked_windows_dns => unreachable,
.blocked_canceling => unreachable,
};
} else false;
if (result) {
assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
if (pre_acknowledged) {
assert(post_acknowledged); // group task called `recancel` but was not canceled
} else {
assert(!post_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
}
} else |err| switch (err) {
error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled
// Don't swallow the cancelation: make it visible to the `Group.async` caller.
error.Canceled => {
assert(!pre_acknowledged); // group task called `recancel` but was not canceled
assert(post_acknowledged); // group task returned `error.Canceled` but was never canceled
recancelInner();
},
}
}
@@ -1920,6 +1964,9 @@ fn groupCancel(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *an
fn recancel(userdata: ?*anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
recancelInner();
}
fn recancelInner() void {
const thread = Thread.current.?; // called `recancel` but was not canceled
switch (thread.status.fetchXor(.{
.cancelation = @enumFromInt(0b001),
@@ -1993,7 +2040,16 @@ fn await(
future.waitForCancelWithSignaling(t, &num_completed, null);
},
}
recancel(t);
// If the future did not acknowledge the cancelation, we need to mark it outstanding
// for us. Because `future.status.tag == .done`, the information about whether there
// was an acknowledged cancelation is encoded in `future.status.thread`.
const final_status = future.status.load(.monotonic);
assert(final_status.tag == .done);
switch (final_status.thread) {
.null => recancelInner(), // cancelation was not acknowledged, so it's ours
.all_ones => {}, // cancelation was acknowledged, so it was this task's job to propagate it
_ => unreachable,
}
},
},
.pending_awaited => unreachable, // `await` raced with `await`
@@ -11669,7 +11725,7 @@ fn unlockStderr(userdata: ?*anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
t.stderr_writer.interface.flush() catch |err| switch (err) {
error.WriteFailed => switch (t.stderr_writer.err.?) {
error.Canceled => recancel(t),
error.Canceled => recancelInner(),
else => {},
},
};
+1 -1
View File
@@ -124,7 +124,7 @@ test "Group.async context alignment" {
var group: std.Io.Group = .init;
var result: ByteArray512 = undefined;
group.async(io, concatByteArraysResultPtr, .{ a, b, &result });
group.awaitUncancelable(io);
try group.await(io);
try std.testing.expectEqualSlices(u8, &expected.x, &result.x);
}
+3 -3
View File
@@ -194,7 +194,7 @@ test "Group" {
group.async(io, count, .{ 1, 10, &results[0] });
group.async(io, count, .{ 20, 30, &results[1] });
group.awaitUncancelable(io);
try group.await(io);
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
}
@@ -544,9 +544,9 @@ test "tasks spawned in group after Group.cancel are canceled" {
group.concurrent(io, blockUntilCanceled, .{io}) catch {};
group.async(io, blockUntilCanceled, .{io});
}
fn blockUntilCanceled(io: Io) void {
fn blockUntilCanceled(io: Io) Io.Cancelable!void {
while (true) io.sleep(.fromSeconds(100_000), .awake) catch |err| switch (err) {
error.Canceled => return,
error.Canceled => |e| return e,
error.UnsupportedClock => @panic("unsupported clock"),
error.Unexpected => @panic("unexpected"),
};