mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-04-28 03:17:08 +03:00
2adfd4d107
Rename `wait` to `await` to be consistent with Future API. The convention here is that this set of functionality goes together: * async/concurrent * await/cancel Also rename Select `wait` to `await` for the same reason. `Group.await` now can return `error.Canceled`. Furthermore, `Group.await` does not auto-propagate cancelation. Instead, users should follow the pattern of `defer group.cancel(io);` after initialization, and doing `try group.await(io);` at the end of the success path. Advanced logic can choose to do something other than this pattern in the event of cancelation. Additionally, fixes a bug in `std.Io.Threaded` future await, in which it swallowed an `error.Canceled`. Now if a task is canceled while awaiting a future, after propagating the cancel request, it also recancels, meaning that the awaiting task will properly detect its own cancelation at the next cancelation point. Furthermore, fixes a bug in the compiler where `error.Canceled` was being swallowed in `dispatchPrelinkWork`. Finally, fixes std.crypto code that inappropriately used `catch unreachable` in response to cancelation without even so much as a comment explaining why it was believed to be unreachable. Now, those functions have `error.Canceled` in the error set and propagate cancelation properly. With this way of doing things, `Group.await` has a nice property: even if all tasks in the group are CPU bound and without cancelation points, the `Group.await` can still be canceled. In such case, the task that was waiting for `await` wakes up with a chance to do some more resource cleanup tasks, such as canceling more things, before entering the deferred `Group.cancel` call at which point it has to suspend until the canceled but uninterruptible CPU bound tasks complete. closes #30601
144 lines
4.4 KiB
Zig
144 lines
4.4 KiB
Zig
//! Tests belong here if they access internal state of std.Io.Threaded or
|
|
//! otherwise assume details of that particular implementation.
|
|
const builtin = @import("builtin");
|
|
|
|
const std = @import("std");
|
|
const Io = std.Io;
|
|
const testing = std.testing;
|
|
const assert = std.debug.assert;
|
|
|
|
test "concurrent vs main prevents deadlock via oversubscription" {
|
|
if (true) {
|
|
// https://codeberg.org/ziglang/zig/issues/30141
|
|
return error.SkipZigTest;
|
|
}
|
|
|
|
var threaded: Io.Threaded = .init(std.testing.allocator, .{});
|
|
defer threaded.deinit();
|
|
const io = threaded.io();
|
|
|
|
threaded.async_limit = .nothing;
|
|
|
|
var queue: Io.Queue(u8) = .init(&.{});
|
|
|
|
var putter = io.concurrent(put, .{ io, &queue }) catch |err| switch (err) {
|
|
error.ConcurrencyUnavailable => {
|
|
try testing.expect(builtin.single_threaded);
|
|
return;
|
|
},
|
|
};
|
|
defer putter.cancel(io);
|
|
|
|
try testing.expectEqual(42, queue.getOneUncancelable(io));
|
|
}
|
|
|
|
fn put(io: Io, queue: *Io.Queue(u8)) void {
|
|
queue.putOneUncancelable(io, 42);
|
|
}
|
|
|
|
fn get(io: Io, queue: *Io.Queue(u8)) void {
|
|
assert(queue.getOneUncancelable(io) == 42);
|
|
}
|
|
|
|
test "concurrent vs concurrent prevents deadlock via oversubscription" {
|
|
if (true) {
|
|
// https://codeberg.org/ziglang/zig/issues/30141
|
|
return error.SkipZigTest;
|
|
}
|
|
|
|
var threaded: Io.Threaded = .init(std.testing.allocator, .{});
|
|
defer threaded.deinit();
|
|
const io = threaded.io();
|
|
|
|
threaded.async_limit = .nothing;
|
|
|
|
var queue: Io.Queue(u8) = .init(&.{});
|
|
|
|
var putter = io.concurrent(put, .{ io, &queue }) catch |err| switch (err) {
|
|
error.ConcurrencyUnavailable => {
|
|
try testing.expect(builtin.single_threaded);
|
|
return;
|
|
},
|
|
};
|
|
defer putter.cancel(io);
|
|
|
|
var getter = try io.concurrent(get, .{ io, &queue });
|
|
defer getter.cancel(io);
|
|
|
|
getter.await(io);
|
|
putter.await(io);
|
|
}
|
|
|
|
const ByteArray256 = struct { x: [32]u8 align(32) };
|
|
const ByteArray512 = struct { x: [64]u8 align(64) };
|
|
|
|
fn concatByteArrays(a: ByteArray256, b: ByteArray256) ByteArray512 {
|
|
return .{ .x = a.x ++ b.x };
|
|
}
|
|
|
|
test "async/concurrent context and result alignment" {
|
|
var buffer: [2048]u8 align(@alignOf(ByteArray512)) = undefined;
|
|
var fba: std.heap.FixedBufferAllocator = .init(&buffer);
|
|
|
|
var threaded: std.Io.Threaded = .init(fba.allocator(), .{});
|
|
defer threaded.deinit();
|
|
const io = threaded.io();
|
|
|
|
const a: ByteArray256 = .{ .x = @splat(2) };
|
|
const b: ByteArray256 = .{ .x = @splat(3) };
|
|
const expected: ByteArray512 = .{ .x = @as([32]u8, @splat(2)) ++ @as([32]u8, @splat(3)) };
|
|
|
|
{
|
|
var future = io.async(concatByteArrays, .{ a, b });
|
|
const result = future.await(io);
|
|
try std.testing.expectEqualSlices(u8, &expected.x, &result.x);
|
|
}
|
|
{
|
|
var future = io.concurrent(concatByteArrays, .{ a, b }) catch |err| switch (err) {
|
|
error.ConcurrencyUnavailable => {
|
|
try testing.expect(builtin.single_threaded);
|
|
return;
|
|
},
|
|
};
|
|
const result = future.await(io);
|
|
try std.testing.expectEqualSlices(u8, &expected.x, &result.x);
|
|
}
|
|
}
|
|
|
|
fn concatByteArraysResultPtr(a: ByteArray256, b: ByteArray256, result: *ByteArray512) void {
|
|
result.* = .{ .x = a.x ++ b.x };
|
|
}
|
|
|
|
test "Group.async context alignment" {
|
|
var buffer: [2048]u8 align(@alignOf(ByteArray512)) = undefined;
|
|
var fba: std.heap.FixedBufferAllocator = .init(&buffer);
|
|
|
|
var threaded: std.Io.Threaded = .init(fba.allocator(), .{});
|
|
defer threaded.deinit();
|
|
const io = threaded.io();
|
|
|
|
const a: ByteArray256 = .{ .x = @splat(2) };
|
|
const b: ByteArray256 = .{ .x = @splat(3) };
|
|
const expected: ByteArray512 = .{ .x = @as([32]u8, @splat(2)) ++ @as([32]u8, @splat(3)) };
|
|
|
|
var group: std.Io.Group = .init;
|
|
var result: ByteArray512 = undefined;
|
|
group.async(io, concatByteArraysResultPtr, .{ a, b, &result });
|
|
group.awaitUncancelable(io);
|
|
try std.testing.expectEqualSlices(u8, &expected.x, &result.x);
|
|
}
|
|
|
|
fn returnArray() [32]u8 {
|
|
return @splat(5);
|
|
}
|
|
|
|
test "async with array return type" {
|
|
var threaded: std.Io.Threaded = .init(std.testing.allocator, .{});
|
|
defer threaded.deinit();
|
|
const io = threaded.io();
|
|
|
|
var future = io.async(returnArray, .{});
|
|
const result = future.await(io);
|
|
try std.testing.expectEqualSlices(u8, &@as([32]u8, @splat(5)), &result);
|
|
}
|