mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-04-26 13:01:34 +03:00
std.Io: fix and improve Group API
Rename `wait` to `await` to be consistent with Future API. The convention here is that this set of functionality goes together: * async/concurrent * await/cancel Also rename Select `wait` to `await` for the same reason. `Group.await` now can return `error.Canceled`. Furthermore, `Group.await` does not auto-propagate cancelation. Instead, users should follow the pattern of `defer group.cancel(io);` after initialization, and doing `try group.await(io);` at the end of the success path. Advanced logic can choose to do something other than this pattern in the event of cancelation. Additionally, fixes a bug in `std.Io.Threaded` future await, in which it swallowed an `error.Canceled`. Now if a task is canceled while awaiting a future, after propagating the cancel request, it also recancels, meaning that the awaiting task will properly detect its own cancelation at the next cancelation point. Furthermore, fixes a bug in the compiler where `error.Canceled` was being swallowed in `dispatchPrelinkWork`. Finally, fixes std.crypto code that inappropriately used `catch unreachable` in response to cancelation without even so much as a comment explaining why it was believed to be unreachable. Now, those functions have `error.Canceled` in the error set and propagate cancelation properly. With this way of doing things, `Group.await` has a nice property: even if all tasks in the group are CPU bound and without cancelation points, the `Group.await` can still be canceled. In such case, the task that was waiting for `await` wakes up with a chance to do some more resource cleanup tasks, such as canceling more things, before entering the deferred `Group.cancel` call at which point it has to suspend until the canceled but uninterruptible CPU bound tasks complete. closes #30601
This commit is contained in:
@@ -748,7 +748,7 @@ fn runStepNames(
|
||||
defer step_prog.end();
|
||||
|
||||
var group: Io.Group = .init;
|
||||
defer group.wait(io);
|
||||
defer group.cancel(io);
|
||||
|
||||
// Here we spawn the initial set of tasks with a nice heuristic -
|
||||
// dependency order. Each worker when it finishes a step will then
|
||||
@@ -760,6 +760,8 @@ fn runStepNames(
|
||||
|
||||
group.async(io, workerMakeOneStep, .{ &group, b, step, step_prog, run });
|
||||
}
|
||||
|
||||
try group.await(io);
|
||||
}
|
||||
|
||||
assert(run.memory_blocked_steps.items.len == 0);
|
||||
@@ -820,7 +822,7 @@ fn runStepNames(
|
||||
// * Memory-mapping to share data between the fuzzer and build runner.
|
||||
// * COFF/PE support added to `std.debug.Info` (it needs a batching API for resolving
|
||||
// many addresses to source locations).
|
||||
.windows => fatal("--fuzz not yet implemented for {s}", .{@tagName(builtin.os.tag)}),
|
||||
.windows => fatal("--fuzz not yet implemented for {t}", .{builtin.os.tag}),
|
||||
else => {},
|
||||
}
|
||||
if (@bitSizeOf(usize) != 64) {
|
||||
@@ -843,7 +845,7 @@ fn runStepNames(
|
||||
step_stack.keys(),
|
||||
parent_prog_node,
|
||||
mode,
|
||||
) catch |err| fatal("failed to start fuzzer: {s}", .{@errorName(err)});
|
||||
) catch |err| fatal("failed to start fuzzer: {t}", .{err});
|
||||
defer f.deinit();
|
||||
|
||||
f.start();
|
||||
|
||||
@@ -78,7 +78,7 @@ pub fn init(
|
||||
all_steps: []const *Build.Step,
|
||||
root_prog_node: std.Progress.Node,
|
||||
mode: Mode,
|
||||
) Allocator.Error!Fuzz {
|
||||
) error{ OutOfMemory, Canceled }!Fuzz {
|
||||
const run_steps: []const *Step.Run = steps: {
|
||||
var steps: std.ArrayList(*Step.Run) = .empty;
|
||||
defer steps.deinit(gpa);
|
||||
@@ -98,7 +98,7 @@ pub fn init(
|
||||
if (steps.items.len == 0) fatal("no fuzz tests found", .{});
|
||||
rebuild_node.setEstimatedTotalItems(steps.items.len);
|
||||
const run_steps = try gpa.dupe(*Step.Run, steps.items);
|
||||
rebuild_group.wait(io);
|
||||
try rebuild_group.await(io);
|
||||
break :steps run_steps;
|
||||
};
|
||||
errdefer gpa.free(run_steps);
|
||||
@@ -517,7 +517,7 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
|
||||
assert(fuzz.mode == .limit);
|
||||
const io = fuzz.io;
|
||||
|
||||
fuzz.group.wait(io);
|
||||
fuzz.group.awaitUncancelable(io);
|
||||
fuzz.group = .init;
|
||||
|
||||
std.debug.print("======= FUZZING REPORT =======\n", .{});
|
||||
|
||||
+23
-12
@@ -436,7 +436,7 @@ pub fn Poller(comptime StreamEnum: type) type {
|
||||
// Cancel the pending read into the FIFO.
|
||||
_ = windows.kernel32.CancelIo(handle);
|
||||
|
||||
// We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
|
||||
// We have to wait for the handle to be signalled, i.e. for the cancelation to complete.
|
||||
switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
|
||||
windows.WAIT_OBJECT_0 => {},
|
||||
windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
|
||||
@@ -644,7 +644,7 @@ pub const VTable = struct {
|
||||
context_alignment: std.mem.Alignment,
|
||||
start: *const fn (*Group, context: *const anyopaque) void,
|
||||
) ConcurrentError!void,
|
||||
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
|
||||
groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
|
||||
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
|
||||
|
||||
recancel: *const fn (?*anyopaque) void,
|
||||
@@ -1023,7 +1023,7 @@ pub fn Future(Result: type) type {
|
||||
any_future: ?*AnyFuture,
|
||||
result: Result,
|
||||
|
||||
/// Equivalent to `await` but places a cancellation request. This causes the task to receive
|
||||
/// Equivalent to `await` but places a cancelation request. This causes the task to receive
|
||||
/// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a
|
||||
/// call to a function in `Io` which can return `error.Canceled`.
|
||||
///
|
||||
@@ -1071,7 +1071,7 @@ pub const Group = struct {
|
||||
/// already been called and completed, or it has successfully been assigned
|
||||
/// a unit of concurrency.
|
||||
///
|
||||
/// After this is called, `wait` or `cancel` must be called before the
|
||||
/// After this is called, `await` or `cancel` must be called before the
|
||||
/// group is deinitialized.
|
||||
///
|
||||
/// Threadsafe.
|
||||
@@ -1092,11 +1092,11 @@ pub const Group = struct {
|
||||
}
|
||||
|
||||
/// Calls `function` with `args`, such that the function is not guaranteed
|
||||
/// to have returned until `wait` is called, allowing the caller to
|
||||
/// to have returned until `await` is called, allowing the caller to
|
||||
/// progress while waiting for any `Io` operations.
|
||||
///
|
||||
/// The resource spawned is owned by the group; after this is called,
|
||||
/// `wait` or `cancel` must be called before the group is deinitialized.
|
||||
/// `await` or `cancel` must be called before the group is deinitialized.
|
||||
///
|
||||
/// This has stronger guarantee than `async`, placing restrictions on what kind
|
||||
/// of `Io` implementations are supported. By calling `async` instead, one
|
||||
@@ -1120,20 +1120,31 @@ pub const Group = struct {
|
||||
}
|
||||
|
||||
/// Blocks until all tasks of the group finish. During this time,
|
||||
/// cancellation requests propagate to all members of the group.
|
||||
/// cancelation requests propagate to all members of the group.
|
||||
///
|
||||
/// Idempotent. Not threadsafe.
|
||||
///
|
||||
/// It is safe to call this function concurrently with `Group.async` or
|
||||
/// `Group.concurrent`, provided that the group does not complete until
|
||||
/// the call to `Group.async` or `Group.concurrent` returns.
|
||||
pub fn wait(g: *Group, io: Io) void {
|
||||
pub fn await(g: *Group, io: Io) Cancelable!void {
|
||||
const token = g.token.load(.acquire) orelse return;
|
||||
io.vtable.groupWait(io.userdata, g, token);
|
||||
try io.vtable.groupAwait(io.userdata, g, token);
|
||||
assert(g.token.raw == null);
|
||||
}
|
||||
|
||||
/// Equivalent to `wait` but immediately requests cancellation on all
|
||||
/// Equivalent to `await` but temporarily blocks cancelation while waiting.
|
||||
pub fn awaitUncancelable(g: *Group, io: Io) void {
|
||||
const token = g.token.load(.acquire) orelse return;
|
||||
const prev = swapCancelProtection(io, .blocked);
|
||||
defer _ = swapCancelProtection(io, prev);
|
||||
io.vtable.groupAwait(io.userdata, g, token) catch |err| switch (err) {
|
||||
error.Canceled => unreachable,
|
||||
};
|
||||
assert(g.token.raw == null);
|
||||
}
|
||||
|
||||
/// Equivalent to `await` but immediately requests cancelation on all
|
||||
/// members of the group.
|
||||
///
|
||||
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
||||
@@ -1272,7 +1283,7 @@ pub fn Select(comptime U: type) type {
|
||||
/// Asserts there is at least one more `outstanding` task.
|
||||
///
|
||||
/// Not threadsafe.
|
||||
pub fn wait(s: *S) Cancelable!U {
|
||||
pub fn await(s: *S) Cancelable!U {
|
||||
s.outstanding -= 1;
|
||||
return s.queue.getOne(s.io) catch |err| switch (err) {
|
||||
error.Canceled => |e| return e,
|
||||
@@ -1280,7 +1291,7 @@ pub fn Select(comptime U: type) type {
|
||||
};
|
||||
}
|
||||
|
||||
/// Equivalent to `wait` but requests cancellation on all remaining
|
||||
/// Equivalent to `wait` but requests cancelation on all remaining
|
||||
/// tasks owned by the select.
|
||||
///
|
||||
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
||||
|
||||
+12
-13
@@ -795,7 +795,7 @@ pub fn io(t: *Threaded) Io {
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
.groupWait = groupWait,
|
||||
.groupAwait = groupAwait,
|
||||
.groupCancel = groupCancel,
|
||||
|
||||
.recancel = recancel,
|
||||
@@ -933,7 +933,7 @@ pub fn ioBasic(t: *Threaded) Io {
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
.groupWait = groupWait,
|
||||
.groupAwait = groupAwait,
|
||||
.groupCancel = groupCancel,
|
||||
|
||||
.recancel = recancel,
|
||||
@@ -1166,6 +1166,7 @@ const AsyncClosure = struct {
|
||||
error.Canceled => {
|
||||
ac.closure.requestCancel(t);
|
||||
ac.event.waitUncancelable(ioBasic(t));
|
||||
recancel(t);
|
||||
},
|
||||
};
|
||||
@memcpy(result, ac.resultPointer()[0..result.len]);
|
||||
@@ -1452,7 +1453,7 @@ fn groupConcurrent(
|
||||
t.cond.signal();
|
||||
}
|
||||
|
||||
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void {
|
||||
fn groupAwait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const gpa = t.allocator;
|
||||
|
||||
@@ -1464,16 +1465,14 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque)
|
||||
const event: *Io.Event = @ptrCast(&group.context);
|
||||
const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire);
|
||||
assert(prev_state & GroupClosure.sync_is_waiting == 0);
|
||||
if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) {
|
||||
error.Canceled => {
|
||||
var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic)));
|
||||
while (it) |node| : (it = node.next) {
|
||||
const gc: *GroupClosure = @fieldParentPtr("node", node);
|
||||
gc.closure.requestCancel(t);
|
||||
}
|
||||
event.waitUncancelable(ioBasic(t));
|
||||
},
|
||||
};
|
||||
{
|
||||
errdefer _ = group_state.fetchSub(GroupClosure.sync_is_waiting, .monotonic);
|
||||
// This event.wait can return error.Canceled, in which case this logic does
|
||||
// *not* propagate cancel requests to each group member. Instead, the user
|
||||
// code will likely do this with a defered call to groupCancel, or,
|
||||
// intentionally not do this.
|
||||
if ((prev_state / GroupClosure.sync_one_pending) > 0) try event.wait(ioBasic(t));
|
||||
}
|
||||
|
||||
// Since the group has now finished, it's illegal to add more tasks to it until we return. It's
|
||||
// also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only
|
||||
|
||||
@@ -124,7 +124,7 @@ test "Group.async context alignment" {
|
||||
var group: std.Io.Group = .init;
|
||||
var result: ByteArray512 = undefined;
|
||||
group.async(io, concatByteArraysResultPtr, .{ a, b, &result });
|
||||
group.wait(io);
|
||||
group.awaitUncancelable(io);
|
||||
try std.testing.expectEqualSlices(u8, &expected.x, &result.x);
|
||||
}
|
||||
|
||||
|
||||
@@ -289,7 +289,7 @@ pub fn connectMany(
|
||||
} else |err| switch (err) {
|
||||
error.Canceled => |e| return e,
|
||||
error.Closed => {
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
return lookup_future.await(io);
|
||||
},
|
||||
}
|
||||
|
||||
+2
-2
@@ -194,7 +194,7 @@ test "Group" {
|
||||
group.async(io, count, .{ 1, 10, &results[0] });
|
||||
group.async(io, count, .{ 20, 30, &results[1] });
|
||||
|
||||
group.wait(io);
|
||||
group.awaitUncancelable(io);
|
||||
|
||||
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
|
||||
}
|
||||
@@ -249,7 +249,7 @@ test "Group concurrent" {
|
||||
},
|
||||
};
|
||||
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
|
||||
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
|
||||
}
|
||||
|
||||
+1
-1
@@ -184,7 +184,7 @@ pub const pwhash = struct {
|
||||
|
||||
pub const Error = HasherError || error{AllocatorRequired};
|
||||
pub const HasherError = KdfError || phc_format.Error;
|
||||
pub const KdfError = errors.Error || std.mem.Allocator.Error || std.Thread.SpawnError;
|
||||
pub const KdfError = errors.Error || std.mem.Allocator.Error || std.Thread.SpawnError || std.Io.Cancelable;
|
||||
|
||||
pub const argon2 = @import("crypto/argon2.zig");
|
||||
pub const bcrypt = @import("crypto/bcrypt.zig");
|
||||
|
||||
+16
-14
@@ -2,9 +2,9 @@
|
||||
// https://github.com/golang/crypto/tree/master/argon2
|
||||
// https://github.com/P-H-C/phc-winner-argon2
|
||||
|
||||
const std = @import("std");
|
||||
const builtin = @import("builtin");
|
||||
|
||||
const std = @import("std");
|
||||
const blake2 = crypto.hash.blake2;
|
||||
const crypto = std.crypto;
|
||||
const Io = std.Io;
|
||||
@@ -53,23 +53,24 @@ pub const Mode = enum {
|
||||
pub const Params = struct {
|
||||
const Self = @This();
|
||||
|
||||
/// A [t]ime cost, which defines the amount of computation realized and therefore the execution
|
||||
/// Time cost, which defines the amount of computation realized and therefore the execution
|
||||
/// time, given in number of iterations.
|
||||
t: u32,
|
||||
|
||||
/// A [m]emory cost, which defines the memory usage, given in kibibytes.
|
||||
/// Memory cost, which defines the memory usage, given in kibibytes.
|
||||
m: u32,
|
||||
|
||||
/// A [p]arallelism degree, which defines the number of parallel threads.
|
||||
/// Parallelism degree, which defines the number of independent tasks,
|
||||
/// to be multiplexed onto threads when possible.
|
||||
p: u24,
|
||||
|
||||
/// The [secret] parameter, which is used for keyed hashing. This allows a secret key to be input
|
||||
/// The secret parameter, which is used for keyed hashing. This allows a secret key to be input
|
||||
/// at hashing time (from some external location) and be folded into the value of the hash. This
|
||||
/// means that even if your salts and hashes are compromised, an attacker cannot brute-force to
|
||||
/// find the password without the key.
|
||||
secret: ?[]const u8 = null,
|
||||
|
||||
/// The [ad] parameter, which is used to fold any additional data into the hash value. Functionally,
|
||||
/// The ad parameter, which is used to fold any additional data into the hash value. Functionally,
|
||||
/// this behaves almost exactly like the secret or salt parameters; the ad parameter is folding
|
||||
/// into the value of the hash. However, this parameter is used for different data. The salt
|
||||
/// should be a random string stored alongside your password. The secret should be a random key
|
||||
@@ -209,18 +210,18 @@ fn processBlocks(
|
||||
threads: u24,
|
||||
mode: Mode,
|
||||
io: Io,
|
||||
) void {
|
||||
) Io.Cancelable!void {
|
||||
const lanes = memory / threads;
|
||||
const segments = lanes / sync_points;
|
||||
|
||||
if (builtin.single_threaded or threads == 1) {
|
||||
processBlocksSt(blocks, time, memory, threads, mode, lanes, segments);
|
||||
processBlocksSync(blocks, time, memory, threads, mode, lanes, segments);
|
||||
} else {
|
||||
processBlocksMt(blocks, time, memory, threads, mode, lanes, segments, io);
|
||||
try processBlocksAsync(blocks, time, memory, threads, mode, lanes, segments, io);
|
||||
}
|
||||
}
|
||||
|
||||
fn processBlocksSt(
|
||||
fn processBlocksSync(
|
||||
blocks: *Blocks,
|
||||
time: u32,
|
||||
memory: u32,
|
||||
@@ -241,7 +242,7 @@ fn processBlocksSt(
|
||||
}
|
||||
}
|
||||
|
||||
fn processBlocksMt(
|
||||
fn processBlocksAsync(
|
||||
blocks: *Blocks,
|
||||
time: u32,
|
||||
memory: u32,
|
||||
@@ -250,19 +251,20 @@ fn processBlocksMt(
|
||||
lanes: u32,
|
||||
segments: u32,
|
||||
io: Io,
|
||||
) void {
|
||||
) Io.Cancelable!void {
|
||||
var n: u32 = 0;
|
||||
while (n < time) : (n += 1) {
|
||||
var slice: u32 = 0;
|
||||
while (slice < sync_points) : (slice += 1) {
|
||||
var group: Io.Group = .init;
|
||||
defer group.cancel(io);
|
||||
var lane: u24 = 0;
|
||||
while (lane < threads) : (lane += 1) {
|
||||
group.async(io, processSegment, .{
|
||||
blocks, time, memory, threads, mode, lanes, segments, n, slice, lane,
|
||||
});
|
||||
}
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -503,7 +505,7 @@ pub fn kdf(
|
||||
blocks.appendNTimesAssumeCapacity(@splat(0), memory);
|
||||
|
||||
initBlocks(&blocks, &h0, memory, params.p);
|
||||
processBlocks(&blocks, params.t, memory, params.p, mode, io);
|
||||
try processBlocks(&blocks, params.t, memory, params.p, mode, io);
|
||||
finalize(&blocks, memory, params.p, derived_key);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
const std = @import("std");
|
||||
const builtin = @import("builtin");
|
||||
|
||||
const std = @import("std");
|
||||
const fmt = std.fmt;
|
||||
const mem = std.mem;
|
||||
const Io = std.Io;
|
||||
const Thread = std.Thread;
|
||||
const Allocator = std.mem.Allocator;
|
||||
|
||||
const Vec4 = @Vector(4, u32);
|
||||
const Vec8 = @Vector(8, u32);
|
||||
@@ -767,7 +769,7 @@ fn buildMerkleTreeLayerParallel(
|
||||
key: [8]u32,
|
||||
flags: Flags,
|
||||
io: Io,
|
||||
) void {
|
||||
) Io.Cancelable!void {
|
||||
const num_parents = input_cvs.len / 2;
|
||||
|
||||
// Process sequentially with SIMD for smaller tree layers to avoid thread overhead
|
||||
@@ -787,6 +789,7 @@ fn buildMerkleTreeLayerParallel(
|
||||
const num_workers = Thread.getCpuCount() catch 1;
|
||||
const parents_per_worker = (num_parents + num_workers - 1) / num_workers;
|
||||
var group: Io.Group = .init;
|
||||
defer group.cancel(io);
|
||||
|
||||
for (0..num_workers) |worker_id| {
|
||||
const start_idx = worker_id * parents_per_worker;
|
||||
@@ -801,7 +804,7 @@ fn buildMerkleTreeLayerParallel(
|
||||
.flags = flags,
|
||||
}});
|
||||
}
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
}
|
||||
|
||||
fn parentOutput(parent_block: []const u8, key: [8]u32, flags: Flags) Output {
|
||||
@@ -987,7 +990,7 @@ pub const Blake3 = struct {
|
||||
d.final(out);
|
||||
}
|
||||
|
||||
pub fn hashParallel(b: []const u8, out: []u8, options: Options, allocator: std.mem.Allocator, io: Io) !void {
|
||||
pub fn hashParallel(b: []const u8, out: []u8, options: Options, allocator: Allocator, io: Io) error{ OutOfMemory, Canceled }!void {
|
||||
if (b.len < parallel_threshold) {
|
||||
return hash(b, out, options);
|
||||
}
|
||||
@@ -1008,6 +1011,7 @@ pub const Blake3 = struct {
|
||||
const num_workers = thread_count;
|
||||
const chunks_per_worker = (num_full_chunks + num_workers - 1) / num_workers;
|
||||
var group: Io.Group = .init;
|
||||
defer group.cancel(io);
|
||||
|
||||
for (0..num_workers) |worker_id| {
|
||||
const start_chunk = worker_id * chunks_per_worker;
|
||||
@@ -1022,7 +1026,7 @@ pub const Blake3 = struct {
|
||||
.flags = flags,
|
||||
}});
|
||||
}
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
|
||||
// Build Merkle tree in parallel layers using ping-pong buffers
|
||||
const max_intermediate_size = (num_full_chunks + 1) / 2;
|
||||
@@ -1040,7 +1044,7 @@ pub const Blake3 = struct {
|
||||
const has_odd = current_level.len % 2 == 1;
|
||||
const next_level_size = num_parents + @intFromBool(has_odd);
|
||||
|
||||
buildMerkleTreeLayerParallel(
|
||||
try buildMerkleTreeLayerParallel(
|
||||
current_level[0 .. num_parents * 2],
|
||||
next_level_buf[0..num_parents],
|
||||
key_words,
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
const std = @import("std");
|
||||
const builtin = @import("builtin");
|
||||
|
||||
const std = @import("std");
|
||||
const crypto = std.crypto;
|
||||
const Allocator = std.mem.Allocator;
|
||||
const Io = std.Io;
|
||||
const Thread = std.Thread;
|
||||
const assert = std.debug.assert;
|
||||
|
||||
const TurboSHAKE128State = crypto.hash.sha3.TurboShake128(0x06);
|
||||
const TurboSHAKE256State = crypto.hash.sha3.TurboShake256(0x06);
|
||||
@@ -598,7 +599,7 @@ inline fn processNLeaves(
|
||||
output: []align(@alignOf(u64)) u8,
|
||||
) void {
|
||||
const cv_size = Variant.cv_size;
|
||||
comptime std.debug.assert(cv_size % @sizeOf(u64) == 0);
|
||||
comptime assert(cv_size % @sizeOf(u64) == 0);
|
||||
|
||||
if (view.tryGetSlice(j, j + N * chunk_size)) |leaf_data| {
|
||||
var leaf_cvs: [N * cv_size]u8 = undefined;
|
||||
@@ -645,7 +646,7 @@ fn processLeafBatch(comptime Variant: type, ctx: LeafBatchContext) void {
|
||||
j += chunk_len;
|
||||
}
|
||||
|
||||
std.debug.assert(cvs_offset == ctx.output_cvs.len);
|
||||
assert(cvs_offset == ctx.output_cvs.len);
|
||||
}
|
||||
|
||||
/// Helper to process N leaves in SIMD and absorb CVs into state
|
||||
@@ -841,7 +842,7 @@ fn ktMultiThreaded(
|
||||
total_len: usize,
|
||||
output: []u8,
|
||||
) !void {
|
||||
comptime std.debug.assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0);
|
||||
comptime assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0);
|
||||
|
||||
const cv_size = Variant.cv_size;
|
||||
const StateType = Variant.StateType;
|
||||
@@ -883,6 +884,7 @@ fn ktMultiThreaded(
|
||||
var pending_cv_lens: [256]usize = .{0} ** 256;
|
||||
|
||||
var select: Select = .init(io, select_buf);
|
||||
defer select.cancel();
|
||||
var batches_spawned: usize = 0;
|
||||
var next_to_process: usize = 0;
|
||||
|
||||
@@ -901,7 +903,7 @@ fn ktMultiThreaded(
|
||||
batches_spawned += 1;
|
||||
}
|
||||
|
||||
const result = select.wait() catch unreachable;
|
||||
const result = try select.await();
|
||||
const batch = result.batch;
|
||||
const slot = batch.batch_idx % max_concurrent;
|
||||
|
||||
@@ -925,7 +927,7 @@ fn ktMultiThreaded(
|
||||
}
|
||||
}
|
||||
|
||||
select.group.wait(io);
|
||||
assert(select.outstanding == 0);
|
||||
}
|
||||
|
||||
if (has_partial_leaf) {
|
||||
|
||||
+12
-6
@@ -4698,7 +4698,7 @@ fn performAllTheWork(
|
||||
});
|
||||
}
|
||||
|
||||
astgen_group.wait(io);
|
||||
try astgen_group.await(io);
|
||||
}
|
||||
|
||||
if (comp.zcu) |zcu| {
|
||||
@@ -4761,7 +4761,7 @@ fn performAllTheWork(
|
||||
// Since we're skipping analysis, there are no ZCU link tasks.
|
||||
comp.link_queue.finishZcuQueue(comp);
|
||||
// Let other compilation work finish to collect as many errors as possible.
|
||||
misc_group.wait(io);
|
||||
try misc_group.await(io);
|
||||
comp.link_queue.wait(io);
|
||||
return;
|
||||
}
|
||||
@@ -4850,18 +4850,22 @@ fn performAllTheWork(
|
||||
comp.link_queue.finishZcuQueue(comp);
|
||||
|
||||
// Main thread work is all done, now just wait for all async work.
|
||||
misc_group.wait(io);
|
||||
try misc_group.await(io);
|
||||
comp.link_queue.wait(io);
|
||||
}
|
||||
|
||||
fn dispatchPrelinkWork(comp: *Compilation, main_progress_node: std.Progress.Node) void {
|
||||
const io = comp.io;
|
||||
|
||||
// TODO should this function be cancelable?
|
||||
const prev_cancel_prot = io.swapCancelProtection(.blocked);
|
||||
defer _ = io.swapCancelProtection(prev_cancel_prot);
|
||||
|
||||
var prelink_group: Io.Group = .init;
|
||||
defer prelink_group.cancel(io);
|
||||
|
||||
comp.queuePrelinkTasks(comp.oneshot_prelink_tasks.items) catch |err| switch (err) {
|
||||
error.Canceled => return,
|
||||
error.Canceled => unreachable, // see swapCancelProtection above
|
||||
};
|
||||
comp.oneshot_prelink_tasks.clearRetainingCapacity();
|
||||
|
||||
@@ -5055,9 +5059,11 @@ fn dispatchPrelinkWork(comp: *Compilation, main_progress_node: std.Progress.Node
|
||||
});
|
||||
}
|
||||
|
||||
prelink_group.wait(io);
|
||||
prelink_group.await(io) catch |err| switch (err) {
|
||||
error.Canceled => unreachable, // see swapCancelProtection above
|
||||
};
|
||||
comp.link_queue.finishPrelinkQueue(comp) catch |err| switch (err) {
|
||||
error.Canceled => return,
|
||||
error.Canceled => unreachable, // see swapCancelProtection above
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
+10
-6
@@ -146,6 +146,8 @@ pub const JobQueue = struct {
|
||||
pub const UnlazySet = std.AutoArrayHashMapUnmanaged(Package.Hash, void);
|
||||
|
||||
pub fn deinit(jq: *JobQueue) void {
|
||||
const io = jq.io;
|
||||
jq.group.cancel(io);
|
||||
if (jq.all_fetches.items.len == 0) return;
|
||||
const gpa = jq.all_fetches.items[0].arena.child_allocator;
|
||||
jq.table.deinit(gpa);
|
||||
@@ -847,7 +849,7 @@ pub fn workerRun(f: *Fetch, prog_name: []const u8) void {
|
||||
|
||||
run(f) catch |err| switch (err) {
|
||||
error.OutOfMemory => f.oom_flag = true,
|
||||
error.Canceled => {},
|
||||
error.Canceled => {}, // TODO make groupAsync functions be cancelable and assert proper value was returned
|
||||
error.FetchFailed => {
|
||||
// Nothing to do because the errors are already reported in `error_bundle`,
|
||||
// and a reference is kept to the `Fetch` task inside `all_fetches`.
|
||||
@@ -1517,12 +1519,12 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
|
||||
// The final hash will be a hash of each file hashed independently. This
|
||||
// allows hashing in parallel.
|
||||
var group: Io.Group = .init;
|
||||
defer group.wait(io);
|
||||
defer group.cancel(io);
|
||||
|
||||
while (walker.next(io) catch |err| {
|
||||
try eb.addRootErrorMessage(.{ .msg = try eb.printString(
|
||||
"unable to walk temporary directory '{f}': {s}",
|
||||
.{ pkg_path, @errorName(err) },
|
||||
"unable to walk temporary directory '{f}': {t}",
|
||||
.{ pkg_path, err },
|
||||
) });
|
||||
return error.FetchFailed;
|
||||
}) |entry| {
|
||||
@@ -1552,8 +1554,8 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
|
||||
.file => .file,
|
||||
.sym_link => .link,
|
||||
else => return f.fail(f.location_tok, try eb.printString(
|
||||
"package contains '{s}' which has illegal file type '{s}'",
|
||||
.{ entry.path, @tagName(entry.kind) },
|
||||
"package contains '{s}' which has illegal file type '{t}'",
|
||||
.{ entry.path, entry.kind },
|
||||
)),
|
||||
};
|
||||
|
||||
@@ -1573,6 +1575,8 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
|
||||
group.async(io, workerHashFile, .{ io, root_dir, hashed_file });
|
||||
try all_files.append(hashed_file);
|
||||
}
|
||||
|
||||
try group.await(io);
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@@ -48,7 +48,7 @@ pub fn ParallelHasher(comptime Hasher: type) type {
|
||||
});
|
||||
}
|
||||
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
}
|
||||
for (results) |result| _ = try result;
|
||||
}
|
||||
|
||||
+1
-1
@@ -5284,7 +5284,7 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
|
||||
);
|
||||
|
||||
job_queue.group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" });
|
||||
job_queue.group.wait(io);
|
||||
try job_queue.group.await(io);
|
||||
|
||||
try job_queue.consolidateErrors();
|
||||
|
||||
|
||||
@@ -1951,7 +1951,7 @@ pub fn main() anyerror!void {
|
||||
} });
|
||||
}
|
||||
|
||||
group.wait(io);
|
||||
try group.await(io);
|
||||
}
|
||||
|
||||
const Job = struct {
|
||||
|
||||
Reference in New Issue
Block a user