Merge pull request 'std.Io: introduce batching and operations API, satisfying the "poll" use case' (#30743) from poll into master

Reviewed-on: https://codeberg.org/ziglang/zig/pulls/30743
This commit is contained in:
Andrew Kelley
2026-01-31 12:07:31 +01:00
28 changed files with 1608 additions and 902 deletions
+3 -39
View File
@@ -29,7 +29,7 @@ pub fn build(b: *std.Build) !void {
const use_zig_libcxx = b.option(bool, "use-zig-libcxx", "If libc++ is needed, use zig's bundled version, don't try to integrate with the system") orelse false;
const test_step = b.step("test", "Run all the tests");
const skip_install_lib_files = b.option(bool, "no-lib", "skip copying of lib/ files and langref to installation prefix. Useful for development") orelse false;
const skip_install_lib_files = b.option(bool, "no-lib", "skip copying of lib/ files and langref to installation prefix. Useful for development") orelse only_c;
const skip_install_langref = b.option(bool, "no-langref", "skip copying of langref to the installation prefix") orelse skip_install_lib_files;
const std_docs = b.option(bool, "std-docs", "include standard library autodocs") orelse false;
const no_bin = b.option(bool, "no-bin", "skip emitting compiler binary") orelse false;
@@ -472,27 +472,7 @@ pub fn build(b: *std.Build) !void {
.skip_linux = skip_linux,
.skip_llvm = skip_llvm,
.skip_libc = skip_libc,
.max_rss = switch (b.graph.host.result.os.tag) {
.freebsd => 2_000_000_000,
.linux => switch (b.graph.host.result.cpu.arch) {
.aarch64 => 659_809_075,
.loongarch64 => 598_902_374,
.powerpc64le => 627_431_833,
.riscv64 => 827_043_430,
.s390x => 580_596_121,
.x86_64 => 3_290_894_745,
else => 3_300_000_000,
},
.macos => switch (b.graph.host.result.cpu.arch) {
.aarch64 => 767_736_217,
else => 800_000_000,
},
.windows => switch (b.graph.host.result.cpu.arch) {
.x86_64 => 603_070_054,
else => 700_000_000,
},
else => 3_300_000_000,
},
.max_rss = 3_300_000_000,
}));
test_modules_step.dependOn(tests.addModuleTests(b, .{
@@ -518,23 +498,7 @@ pub fn build(b: *std.Build) !void {
.skip_llvm = skip_llvm,
.skip_libc = true,
.no_builtin = true,
.max_rss = switch (b.graph.host.result.os.tag) {
.freebsd => 800_000_000,
.linux => switch (b.graph.host.result.cpu.arch) {
.aarch64 => 639_565_414,
.loongarch64 => 598_884_352,
.powerpc64le => 597_897_625,
.riscv64 => 636_429_516,
.s390x => 574_166_630,
.x86_64 => 978_463_129,
else => 900_000_000,
},
.macos => switch (b.graph.host.result.cpu.arch) {
.aarch64 => 701_413_785,
else => 800_000_000,
},
else => 900_000_000,
},
.max_rss = 900_000_000,
}));
test_modules_step.dependOn(tests.addModuleTests(b, .{
+38 -17
View File
@@ -381,10 +381,17 @@ pub fn addError(step: *Step, comptime fmt: []const u8, args: anytype) error{OutO
pub const ZigProcess = struct {
child: std.process.Child,
poller: Io.Poller(StreamEnum),
multi_reader_buffer: Io.File.MultiReader.Buffer(2),
multi_reader: Io.File.MultiReader,
progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void,
pub const StreamEnum = enum { stdout, stderr };
pub fn deinit(zp: *ZigProcess, io: Io) void {
zp.child.kill(io);
zp.multi_reader.deinit();
zp.* = undefined;
}
};
/// Assumes that argv contains `--listen=-` and that the process being spawned
@@ -409,7 +416,8 @@ pub fn evalZigProcess(
assert(watch);
if (std.Progress.have_ipc) if (zp.progress_ipc_fd) |fd| prog_node.setIpcFd(fd);
const result = zigProcessUpdate(s, zp, watch, web_server, gpa) catch |err| switch (err) {
error.BrokenPipe => {
error.BrokenPipe, error.EndOfStream => |reason| {
std.log.info("{s} restart required: {t}", .{ argv[0], reason });
// Process restart required.
const term = zp.child.wait(io) catch |e| {
return s.fail("unable to wait for {s}: {t}", .{ argv[0], e });
@@ -455,18 +463,18 @@ pub fn evalZigProcess(
.request_resource_usage_statistics = true,
.progress_node = prog_node,
}) catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err });
defer if (!watch) zp.child.kill(io);
zp.* = .{
.child = zp.child,
.poller = Io.poll(gpa, ZigProcess.StreamEnum, .{
.stdout = zp.child.stdout.?,
.stderr = zp.child.stderr.?,
}),
.multi_reader_buffer = undefined,
.multi_reader = undefined,
.progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {},
};
zp.multi_reader.init(gpa, io, zp.multi_reader_buffer.toStreams(), &.{
zp.child.stdout.?, zp.child.stderr.?,
});
if (watch) s.setZigProcess(zp);
defer if (!watch) zp.poller.deinit();
defer if (!watch) zp.deinit(io);
const result = try zigProcessUpdate(s, zp, watch, web_server, gpa);
@@ -532,15 +540,26 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
if (!watch) try sendMessage(io, zp.child.stdin.?, .exit);
var result: ?Path = null;
var eos_err: error{EndOfStream}!void = {};
const stdout = zp.poller.reader(.stdout);
const stdout = zp.multi_reader.fileReader(0);
poll: while (true) {
while (true) {
const Header = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(Header)) if (!try zp.poller.poll()) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try zp.poller.poll()) break :poll;
const body = stdout.take(header.bytes_len) catch unreachable;
const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => return stdout.err.?,
};
const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) {
error.EndOfStream => |e| {
// Better to report the crash with stderr below, but we set
// this in case the child exits successfully while violating
// this protocol.
eos_err = e;
break;
},
error.ReadFailed => return stdout.err.?,
};
switch (header.tag) {
.zig_version => {
if (!std.mem.eql(u8, builtin.zig_version_string, body)) {
@@ -553,11 +572,11 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
.error_bundle => {
s.result_error_bundle = try std.zig.Server.allocErrorBundle(gpa, body);
// This message indicates the end of the update.
if (watch) break :poll;
if (watch) break;
},
.emit_digest => {
const EmitDigest = std.zig.Server.Message.EmitDigest;
const emit_digest = @as(*align(1) const EmitDigest, @ptrCast(body));
const emit_digest: *align(1) const EmitDigest = @ptrCast(body);
s.result_cached = emit_digest.flags.cache_hit;
const digest = body[@sizeOf(EmitDigest)..][0..Cache.bin_digest_len];
result = .{
@@ -631,11 +650,13 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
s.result_duration_ns = timer.read();
const stderr_contents = try zp.poller.toOwnedSlice(.stderr);
const stderr_contents = zp.multi_reader.reader(1).buffered();
if (stderr_contents.len > 0) {
try s.result_error_msgs.append(arena, try arena.dupe(u8, stderr_contents));
}
try eos_err;
return result;
}
+106 -86
View File
@@ -1385,14 +1385,12 @@ fn runCommand(
break :term spawnChildAndCollect(run, interp_argv.items, &environ_map, has_side_effects, options, fuzz_context) catch |e| {
if (!run.failing_to_execute_foreign_is_an_error) return error.MakeSkipped;
if (e == error.MakeFailed) return error.MakeFailed; // error already reported
return step.fail("unable to spawn interpreter {s}: {s}", .{
interp_argv.items[0], @errorName(e),
});
return step.fail("unable to spawn interpreter {s}: {t}", .{ interp_argv.items[0], e });
};
}
if (err == error.MakeFailed) return error.MakeFailed; // error already reported
return step.fail("failed to spawn and capture stdio from {s}: {s}", .{ argv[0], @errorName(err) });
return step.fail("failed to spawn and capture stdio from {s}: {t}", .{ argv[0], err });
};
const generic_result = opt_generic_result orelse {
@@ -1589,9 +1587,13 @@ fn spawnChildAndCollect(
};
if (run.stdio == .zig_test) {
var timer = try std.time.Timer.start();
defer run.step.result_duration_ns = timer.read();
try evalZigTest(run, spawn_options, options, fuzz_context);
const started: Io.Clock.Timestamp = try .now(io, .awake);
const result = evalZigTest(run, spawn_options, options, fuzz_context) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
};
run.step.result_duration_ns = @intCast((try started.untilNow(io)).raw.nanoseconds);
try result;
return null;
} else {
const inherit = spawn_options.stdout == .inherit or spawn_options.stderr == .inherit;
@@ -1604,10 +1606,14 @@ fn spawnChildAndCollect(
} else .no_color;
defer if (inherit) io.unlockStderr();
try setColorEnvironmentVariables(run, environ_map, terminal_mode);
var timer = try std.time.Timer.start();
const res = try evalGeneric(run, spawn_options);
run.step.result_duration_ns = timer.read();
return .{ .term = res.term, .stdout = res.stdout, .stderr = res.stderr };
const started: Io.Clock.Timestamp = try .now(io, .awake);
const result = evalGeneric(run, spawn_options) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
};
run.step.result_duration_ns = @intCast((try started.untilNow(io)).raw.nanoseconds);
return try result;
}
}
@@ -1669,39 +1675,42 @@ fn evalZigTest(
while (true) {
var child = try process.spawn(io, spawn_options);
var poller = std.Io.poll(gpa, StdioPollEnum, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined;
var multi_reader: Io.File.MultiReader = undefined;
multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? });
var child_killed = false;
defer if (!child_killed) {
child.kill(io);
poller.deinit();
multi_reader.deinit();
run.step.result_peak_rss = @max(
run.step.result_peak_rss,
child.resource_usage_statistics.getMaxRss() orelse 0,
);
};
switch (try pollZigTest(
switch (try waitZigTest(
run,
&child,
options,
fuzz_context,
&poller,
&multi_reader,
&test_metadata,
&test_results,
)) {
.write_failed => |err| {
// The runner unexpectedly closed a stdio pipe, which means a crash. Make sure we've captured
// all available stderr to make our error output as useful as possible.
while (try poller.poll()) {}
run.step.result_stderr = try arena.dupe(u8, poller.reader(.stderr).buffered());
const stderr_fr = multi_reader.fileReader(1);
while (stderr_fr.interface.fillMore()) |_| {} else |e| switch (e) {
error.ReadFailed => return stderr_fr.err.?,
error.EndOfStream => {},
}
run.step.result_stderr = try arena.dupe(u8, stderr_fr.interface.buffered());
// Clean up everything and wait for the child to exit.
child.stdin.?.close(io);
child.stdin = null;
poller.deinit();
multi_reader.deinit();
child_killed = true;
const term = try child.wait(io);
run.step.result_peak_rss = @max(
@@ -1716,13 +1725,13 @@ fn evalZigTest(
.no_poll => |no_poll| {
// This might be a success (we requested exit and the child dutifully closed stdout) or
// a crash of some kind. Either way, the child will terminate by itself -- wait for it.
const stderr_owned = try arena.dupe(u8, poller.reader(.stderr).buffered());
poller.reader(.stderr).tossBuffered();
const stderr_reader = multi_reader.reader(1);
const stderr_owned = try arena.dupe(u8, stderr_reader.buffered());
// Clean up everything and wait for the child to exit.
child.stdin.?.close(io);
child.stdin = null;
poller.deinit();
multi_reader.deinit();
child_killed = true;
const term = try child.wait(io);
run.step.result_peak_rss = @max(
@@ -1770,8 +1779,9 @@ fn evalZigTest(
return;
},
.timeout => |timeout| {
const stderr = poller.reader(.stderr).buffered();
poller.reader(.stderr).tossBuffered();
const stderr_reader = multi_reader.reader(1);
const stderr = stderr_reader.buffered();
stderr_reader.tossBuffered();
if (timeout.active_test_index) |test_index| {
// A test was running. Report the timeout against that test, and continue on to
// the next test.
@@ -1796,16 +1806,16 @@ fn evalZigTest(
}
}
/// Polls stdout of a Zig test process until a termination condition is reached:
/// Reads stdout of a Zig test process until a termination condition is reached:
/// * A write fails, indicating the child unexpectedly closed stdin
/// * A test (or a response from the test runner) times out
/// * `poll` fails, indicating the child closed stdout and stderr
fn pollZigTest(
/// * The wait fails, indicating the child closed stdout and stderr
fn waitZigTest(
run: *Run,
child: *process.Child,
options: Step.MakeOptions,
fuzz_context: ?FuzzContext,
poller: *std.Io.Poller(StdioPollEnum),
multi_reader: *Io.File.MultiReader,
opt_metadata: *?TestMetadata,
results: *Step.TestResults,
) !union(enum) {
@@ -1859,9 +1869,7 @@ fn pollZigTest(
var active_test_index: ?u32 = null;
// `null` means this host does not support `std.time.Timer`. This timer is `reset()` whenever we
// change `active_test_index`, i.e. whenever a test starts or finishes.
var timer: ?std.time.Timer = std.time.Timer.start() catch null;
var last_update: Io.Clock.Timestamp = try .now(io, .awake);
var coverage_id: ?u64 = null;
@@ -1869,16 +1877,26 @@ fn pollZigTest(
// test. For instance, if the test runner leaves this much time between us requesting a test to
// start and it acknowledging the test starting, we terminate the child and raise an error. This
// *should* never happen, but could in theory be caused by some very unlucky IB in a test.
const response_timeout_ns: ?u64 = ns: {
if (fuzz_context != null) break :ns null; // don't timeout fuzz tests
break :ns @max(options.unit_test_timeout_ns orelse 0, 60 * std.time.ns_per_s);
const response_timeout: ?Io.Clock.Duration = t: {
if (fuzz_context != null) break :t null; // don't timeout fuzz tests
const ns = @max(options.unit_test_timeout_ns orelse 0, 60 * std.time.ns_per_s);
break :t .{ .clock = .awake, .raw = .fromNanoseconds(ns) };
};
const test_timeout: ?Io.Clock.Duration = if (options.unit_test_timeout_ns) |ns| .{
.clock = .awake,
.raw = .fromNanoseconds(ns),
} else null;
const stdout = poller.reader(.stdout);
const stderr = poller.reader(.stderr);
const stdout = multi_reader.reader(0);
const stderr = multi_reader.reader(1);
const Header = std.zig.Server.Message.Header;
while (true) {
const Header = std.zig.Server.Message.Header;
const timeout: Io.Timeout = t: {
const opt_duration = if (active_test_index == null) response_timeout else test_timeout;
const duration = opt_duration orelse break :t .none;
break :t .{ .deadline = last_update.addDuration(duration) };
};
// This block is exited when `stdout` contains enough bytes for a `Header`.
header_ready: {
@@ -1887,47 +1905,37 @@ fn pollZigTest(
break :header_ready;
}
// Always `null` if `timer` is `null`.
const opt_timeout_ns: ?u64 = ns: {
if (timer == null) break :ns null;
if (active_test_index == null) break :ns response_timeout_ns;
break :ns options.unit_test_timeout_ns;
multi_reader.fill(64, timeout) catch |err| switch (err) {
error.Timeout => return .{ .timeout = .{
.active_test_index = active_test_index,
.ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds),
} },
error.EndOfStream => return .{ .no_poll = .{
.active_test_index = active_test_index,
.ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds),
} },
else => |e| return e,
};
if (opt_timeout_ns) |timeout_ns| {
const remaining_ns = timeout_ns -| timer.?.read();
if (!try poller.pollTimeout(remaining_ns)) return .{ .no_poll = .{
.active_test_index = active_test_index,
.ns_elapsed = if (timer) |*t| t.read() else 0,
} };
} else {
if (!try poller.poll()) return .{ .no_poll = .{
.active_test_index = active_test_index,
.ns_elapsed = if (timer) |*t| t.read() else 0,
} };
}
if (stdout.buffered().len >= @sizeOf(Header)) {
// There wasn't a header before, but there is one after the `poll`.
break :header_ready;
}
if (opt_timeout_ns) |timeout_ns| {
const cur_ns = timer.?.read();
if (cur_ns >= timeout_ns) return .{ .timeout = .{
.active_test_index = active_test_index,
.ns_elapsed = cur_ns,
} };
}
continue;
}
// There is definitely a header available now -- read it.
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) return .{ .no_poll = .{
.active_test_index = active_test_index,
.ns_elapsed = if (timer) |*t| t.read() else 0,
} };
while (stdout.buffered().len < header.bytes_len) {
multi_reader.fill(64, timeout) catch |err| switch (err) {
error.Timeout => return .{ .timeout = .{
.active_test_index = active_test_index,
.ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds),
} },
error.EndOfStream => return .{ .no_poll = .{
.active_test_index = active_test_index,
.ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds),
} },
else => |e| return e,
};
}
const body = stdout.take(header.bytes_len) catch unreachable;
var body_r: std.Io.Reader = .fixed(body);
switch (header.tag) {
@@ -1968,13 +1976,13 @@ fn pollZigTest(
@memset(opt_metadata.*.?.ns_per_test, std.math.maxInt(u64));
active_test_index = null;
if (timer) |*t| t.reset();
last_update = try .now(io, .awake);
requestNextTest(io, child.stdin.?, &opt_metadata.*.?, &sub_prog_node) catch |err| return .{ .write_failed = err };
},
.test_started => {
active_test_index = opt_metadata.*.?.next_index - 1;
if (timer) |*t| t.reset();
last_update = try .now(io, .awake);
},
.test_results => {
assert(fuzz_context == null);
@@ -2017,7 +2025,10 @@ fn pollZigTest(
}
active_test_index = null;
if (timer) |*t| md.ns_per_test[tr_hdr.index] = t.lap();
const now: Io.Clock.Timestamp = try .now(io, .awake);
md.ns_per_test[tr_hdr.index] = @intCast(last_update.durationTo(now).raw.nanoseconds);
last_update = now;
requestNextTest(io, child.stdin.?, md, &sub_prog_node) catch |err| return .{ .write_failed = err };
},
@@ -2164,6 +2175,7 @@ fn evalGeneric(run: *Run, spawn_options: process.SpawnOptions) !EvalGenericResul
const b = run.step.owner;
const io = b.graph.io;
const arena = b.allocator;
const gpa = b.allocator;
var child = try process.spawn(io, spawn_options);
defer child.kill(io);
@@ -2211,23 +2223,31 @@ fn evalGeneric(run: *Run, spawn_options: process.SpawnOptions) !EvalGenericResul
if (child.stdout) |stdout| {
if (child.stderr) |stderr| {
var poller = std.Io.poll(arena, enum { stdout, stderr }, .{
.stdout = stdout,
.stderr = stderr,
});
defer poller.deinit();
var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined;
var multi_reader: Io.File.MultiReader = undefined;
multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ stdout, stderr });
defer multi_reader.deinit();
while (try poller.poll()) {
const stdout_reader = multi_reader.reader(0);
const stderr_reader = multi_reader.reader(1);
while (multi_reader.fill(64, .none)) |_| {
if (run.stdio_limit.toInt()) |limit| {
if (poller.reader(.stderr).buffered().len > limit)
if (stdout_reader.buffered().len > limit)
return error.StdoutStreamTooLong;
if (poller.reader(.stderr).buffered().len > limit)
if (stderr_reader.buffered().len > limit)
return error.StderrStreamTooLong;
}
} else |err| switch (err) {
error.UnsupportedClock, error.Timeout => unreachable,
error.EndOfStream => {},
else => |e| return e,
}
stdout_bytes = try poller.toOwnedSlice(.stdout);
stderr_bytes = try poller.toOwnedSlice(.stderr);
try multi_reader.checkAnyError();
stdout_bytes = try multi_reader.toOwnedSlice(0);
stderr_bytes = try multi_reader.toOwnedSlice(1);
} else {
var stdout_reader = stdout.readerStreaming(io, &.{});
stdout_bytes = stdout_reader.interface.allocRemaining(arena, run.stdio_limit) catch |err| switch (err) {
+23 -13
View File
@@ -588,11 +588,12 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim
});
defer child.kill(io);
var poller = Io.poll(gpa, enum { stdout, stderr }, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
defer poller.deinit();
var stderr_task = try io.concurrent(readStreamAlloc, .{ gpa, io, child.stderr.?, .unlimited });
defer if (stderr_task.cancel(io)) |slice| gpa.free(slice) else |_| {};
var stdout_buffer: [512]u8 = undefined;
var stdout_reader: Io.File.Reader = .initStreaming(child.stdout.?, io, &stdout_buffer);
const stdout = &stdout_reader.interface;
try child.stdin.?.writeStreamingAll(io, @ptrCast(@as([]const std.zig.Client.Message.Header, &.{
.{ .tag = .update, .bytes_len = 0 },
@@ -600,16 +601,17 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim
})));
const Header = std.zig.Server.Message.Header;
var result: ?Cache.Path = null;
var result_error_bundle = std.zig.ErrorBundle.empty;
var body_buffer: std.ArrayList(u8) = .empty;
defer body_buffer.deinit(gpa);
const stdout = poller.reader(.stdout);
poll: while (true) {
while (stdout.buffered().len < @sizeOf(Header)) if (!(try poller.poll())) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll;
const body = stdout.take(header.bytes_len) catch unreachable;
while (true) {
const header = try stdout.takeStruct(Header, .little);
body_buffer.clearRetainingCapacity();
try stdout.appendExact(gpa, &body_buffer, header.bytes_len);
const body = body_buffer.items;
switch (header.tag) {
.zig_version => {
@@ -636,7 +638,7 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim
}
}
const stderr_contents = try poller.toOwnedSlice(.stderr);
const stderr_contents = try stderr_task.await(io);
if (stderr_contents.len > 0) {
std.debug.print("{s}", .{stderr_contents});
}
@@ -697,6 +699,14 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim
return base_path.join(arena, bin_name);
}
fn readStreamAlloc(gpa: Allocator, io: Io, file: Io.File, limit: Io.Limit) ![]u8 {
var file_reader: Io.File.Reader = .initStreaming(file, io, &.{});
return file_reader.interface.allocRemaining(gpa, limit) catch |err| switch (err) {
error.ReadFailed => return file_reader.err.?,
else => |e| return e,
};
}
pub fn updateTimeReportCompile(ws: *WebServer, opts: struct {
compile: *Build.Step.Compile,
+241 -461
View File
@@ -15,463 +15,13 @@
const Io = @This();
const builtin = @import("builtin");
const is_windows = builtin.os.tag == .windows;
const std = @import("std.zig");
const windows = std.os.windows;
const posix = std.posix;
const math = std.math;
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
pub fn poll(
gpa: Allocator,
comptime StreamEnum: type,
files: PollFiles(StreamEnum),
) Poller(StreamEnum) {
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
var result: Poller(StreamEnum) = .{
.gpa = gpa,
.readers = @splat(.failing),
.poll_fds = undefined,
.windows = if (is_windows) .{
.first_read_done = false,
.overlapped = [1]windows.OVERLAPPED{
std.mem.zeroes(windows.OVERLAPPED),
} ** enum_fields.len,
.small_bufs = undefined,
.active = .{
.count = 0,
.handles_buf = undefined,
.stream_map = undefined,
},
} else {},
};
inline for (enum_fields, 0..) |field, i| {
if (is_windows) {
result.windows.active.handles_buf[i] = @field(files, field.name).handle;
} else {
result.poll_fds[i] = .{
.fd = @field(files, field.name).handle,
.events = posix.POLL.IN,
.revents = undefined,
};
}
}
return result;
}
pub fn Poller(comptime StreamEnum: type) type {
return struct {
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
const PollFd = if (is_windows) void else posix.pollfd;
gpa: Allocator,
readers: [enum_fields.len]Reader,
poll_fds: [enum_fields.len]PollFd,
windows: if (is_windows) struct {
first_read_done: bool,
overlapped: [enum_fields.len]windows.OVERLAPPED,
small_bufs: [enum_fields.len][128]u8,
active: struct {
count: math.IntFittingRange(0, enum_fields.len),
handles_buf: [enum_fields.len]windows.HANDLE,
stream_map: [enum_fields.len]StreamEnum,
pub fn removeAt(self: *@This(), index: u32) void {
assert(index < self.count);
for (index + 1..self.count) |i| {
self.handles_buf[i - 1] = self.handles_buf[i];
self.stream_map[i - 1] = self.stream_map[i];
}
self.count -= 1;
}
},
} else void,
const Self = @This();
pub fn deinit(self: *Self) void {
const gpa = self.gpa;
if (is_windows) {
// cancel any pending IO to prevent clobbering OVERLAPPED value
for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
_ = windows.kernel32.CancelIo(h);
}
}
inline for (&self.readers) |*r| gpa.free(r.buffer);
self.* = undefined;
}
pub fn poll(self: *Self) !bool {
if (is_windows) {
return pollWindows(self, null);
} else {
return pollPosix(self, null);
}
}
pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool {
if (is_windows) {
return pollWindows(self, nanoseconds);
} else {
return pollPosix(self, nanoseconds);
}
}
pub fn reader(self: *Self, which: StreamEnum) *Reader {
return &self.readers[@intFromEnum(which)];
}
pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 {
const gpa = self.gpa;
const r = reader(self, which);
if (r.seek == 0) {
const new = try gpa.realloc(r.buffer, r.end);
r.buffer = &.{};
r.end = 0;
return new;
}
const new = try gpa.dupe(u8, r.buffered());
gpa.free(r.buffer);
r.buffer = &.{};
r.seek = 0;
r.end = 0;
return new;
}
fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
const bump_amt = 512;
const gpa = self.gpa;
if (!self.windows.first_read_done) {
var already_read_data = false;
for (0..enum_fields.len) |i| {
const handle = self.windows.active.handles_buf[i];
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
gpa,
handle,
&self.windows.overlapped[i],
&self.readers[i],
&self.windows.small_bufs[i],
bump_amt,
)) {
.populated, .empty => |state| {
if (state == .populated) already_read_data = true;
self.windows.active.handles_buf[self.windows.active.count] = handle;
self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
self.windows.active.count += 1;
},
.closed => {}, // don't add to the wait_objects list
.closed_populated => {
// don't add to the wait_objects list, but we did already get data
already_read_data = true;
},
}
}
self.windows.first_read_done = true;
if (already_read_data) return true;
}
while (true) {
if (self.windows.active.count == 0) return false;
const status = windows.kernel32.WaitForMultipleObjects(
self.windows.active.count,
&self.windows.active.handles_buf,
0,
if (nanoseconds) |ns|
@min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1)
else
windows.INFINITE,
);
if (status == windows.WAIT_FAILED)
return windows.unexpectedError(windows.GetLastError());
if (status == windows.WAIT_TIMEOUT)
return true;
if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1)
unreachable;
const active_idx = status - windows.WAIT_OBJECT_0;
const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
const handle = self.windows.active.handles_buf[active_idx];
const overlapped = &self.windows.overlapped[stream_idx];
const stream_reader = &self.readers[stream_idx];
const small_buf = &self.windows.small_bufs[stream_idx];
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
.success => |n| n,
.closed => {
self.windows.active.removeAt(active_idx);
continue;
},
.aborted => unreachable,
};
const buf = small_buf[0..num_bytes_read];
const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len);
@memcpy(dest[0..buf.len], buf);
advanceBufferEnd(stream_reader, buf.len);
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
gpa,
handle,
overlapped,
stream_reader,
small_buf,
bump_amt,
)) {
.empty => {}, // irrelevant, we already got data from the small buffer
.populated => {},
.closed,
.closed_populated, // identical, since we already got data from the small buffer
=> self.windows.active.removeAt(active_idx),
}
return true;
}
}
fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
const gpa = self.gpa;
// We ask for ensureUnusedCapacity with this much extra space. This
// has more of an effect on small reads because once the reads
// start to get larger the amount of space an ArrayList will
// allocate grows exponentially.
const bump_amt = 512;
const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP;
const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns|
std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32)
else
-1);
if (events_len == 0) {
for (self.poll_fds) |poll_fd| {
if (poll_fd.fd != -1) return true;
} else return false;
}
var keep_polling = false;
for (&self.poll_fds, &self.readers) |*poll_fd, *r| {
// Try reading whatever is available before checking the error
// conditions.
// It's still possible to read after a POLL.HUP is received,
// always check if there's some data waiting to be read first.
if (poll_fd.revents & posix.POLL.IN != 0) {
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
error.BrokenPipe => 0, // Handle the same as EOF.
else => |e| return e,
};
advanceBufferEnd(r, amt);
if (amt == 0) {
// Remove the fd when the EOF condition is met.
poll_fd.fd = -1;
} else {
keep_polling = true;
}
} else if (poll_fd.revents & err_mask != 0) {
// Exclude the fds that signaled an error.
poll_fd.fd = -1;
} else if (poll_fd.fd != -1) {
keep_polling = true;
}
}
return keep_polling;
}
/// Returns a slice into the unused capacity of `buffer` with at least
/// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary.
///
/// After calling this function, typically the caller will follow up with a
/// call to `advanceBufferEnd` to report the actual number of bytes buffered.
fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 {
{
const unused = r.buffer[r.end..];
if (unused.len >= min_len) return unused;
}
if (r.seek > 0) {
const data = r.buffer[r.seek..r.end];
@memmove(r.buffer[0..data.len], data);
r.seek = 0;
r.end = data.len;
}
{
var list: std.ArrayList(u8) = .{
.items = r.buffer[0..r.end],
.capacity = r.buffer.len,
};
defer r.buffer = list.allocatedSlice();
try list.ensureUnusedCapacity(allocator, min_len);
}
const unused = r.buffer[r.end..];
assert(unused.len >= min_len);
return unused;
}
/// After writing directly into the unused capacity of `buffer`, this function
/// updates `end` so that users of `Reader` can receive the data.
fn advanceBufferEnd(r: *Reader, n: usize) void {
assert(n <= r.buffer.len - r.end);
r.end += n;
}
/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
/// compatibility, we point it to this dummy variables, which we never otherwise access.
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
var win_dummy_bytes_read: u32 = undefined;
/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
/// is available. `handle` must have no pending asynchronous operation.
fn windowsAsyncReadToFifoAndQueueSmallRead(
gpa: Allocator,
handle: windows.HANDLE,
overlapped: *windows.OVERLAPPED,
r: *Reader,
small_buf: *[128]u8,
bump_amt: usize,
) !enum { empty, populated, closed_populated, closed } {
var read_any_data = false;
while (true) {
const fifo_read_pending = while (true) {
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
if (0 == windows.kernel32.ReadFile(
handle,
buf.ptr,
buf_len,
&win_dummy_bytes_read,
overlapped,
)) switch (windows.GetLastError()) {
.IO_PENDING => break true,
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
else => |err| return windows.unexpectedError(err),
};
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
.success => |n| n,
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => unreachable,
};
read_any_data = true;
advanceBufferEnd(r, num_bytes_read);
if (num_bytes_read == buf_len) {
// We filled the buffer, so there's probably more data available.
continue;
} else {
// We didn't fill the buffer, so assume we're out of data.
// There is no pending read.
break false;
}
};
if (fifo_read_pending) cancel_read: {
// Cancel the pending read into the FIFO.
_ = windows.kernel32.CancelIo(handle);
// We have to wait for the handle to be signalled, i.e. for the cancelation to complete.
switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
windows.WAIT_OBJECT_0 => {},
windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
else => unreachable,
}
// If it completed before we canceled, make sure to tell the FIFO!
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
.success => |n| n,
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => break :cancel_read,
};
read_any_data = true;
advanceBufferEnd(r, num_bytes_read);
}
// Try to queue the 1-byte read.
if (0 == windows.kernel32.ReadFile(
handle,
small_buf,
small_buf.len,
&win_dummy_bytes_read,
overlapped,
)) switch (windows.GetLastError()) {
.IO_PENDING => {
// 1-byte read pending as intended
return if (read_any_data) .populated else .empty;
},
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
else => |err| return windows.unexpectedError(err),
};
// We got data back this time. Write it to the FIFO and run the main loop again.
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
.success => |n| n,
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => unreachable,
};
const buf = small_buf[0..num_bytes_read];
const dest = try writableSliceGreedyAlloc(r, gpa, buf.len);
@memcpy(dest[0..buf.len], buf);
advanceBufferEnd(r, buf.len);
read_any_data = true;
}
}
/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
///
/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
/// operation immediately returns data:
/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
/// erroneous results."
/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
/// get the actual number of bytes read."
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
fn windowsGetReadResult(
handle: windows.HANDLE,
overlapped: *windows.OVERLAPPED,
allow_aborted: bool,
) !union(enum) {
success: u32,
closed,
aborted,
} {
var num_bytes_read: u32 = undefined;
if (0 == windows.kernel32.GetOverlappedResult(
handle,
overlapped,
&num_bytes_read,
0,
)) switch (windows.GetLastError()) {
.BROKEN_PIPE => return .closed,
.OPERATION_ABORTED => |err| if (allow_aborted) {
return .aborted;
} else {
return windows.unexpectedError(err);
},
else => |err| return windows.unexpectedError(err),
};
return .{ .success = num_bytes_read };
}
};
}
/// Given an enum, returns a struct with fields of that enum, each field
/// representing an I/O stream for polling.
pub fn PollFiles(comptime StreamEnum: type) type {
return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(Io.File), &@splat(.{}));
}
userdata: ?*anyopaque,
vtable: *const VTable,
@@ -599,6 +149,11 @@ pub const VTable = struct {
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
operate: *const fn (?*anyopaque, Operation) Cancelable!Operation.Result,
batchAwaitAsync: *const fn (?*anyopaque, *Batch) Cancelable!void,
batchAwaitConcurrent: *const fn (?*anyopaque, *Batch, Timeout) Batch.AwaitConcurrentError!void,
batchCancel: *const fn (?*anyopaque, *Batch) void,
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus,
dirCreateDirPathOpen: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions, Dir.OpenOptions) Dir.CreateDirPathOpenError!Dir,
@@ -633,9 +188,7 @@ pub const VTable = struct {
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
/// Returns 0 on end of stream.
fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.Reader.Error!usize,
/// Returns 0 on end of stream.
/// Returns 0 if reading at or past the end.
fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize,
fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
@@ -702,20 +255,247 @@ pub const VTable = struct {
netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void,
};
pub const Operation = union(enum) {
file_read_streaming: FileReadStreaming,
pub const Tag = @typeInfo(Operation).@"union".tag_type.?;
/// May return 0 reads which is different than `error.EndOfStream`.
pub const FileReadStreaming = struct {
file: File,
data: []const []u8,
pub const Error = UnendingError || error{EndOfStream};
pub const UnendingError = error{
InputOutput,
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
ConnectionResetByPeer,
/// File was not opened with read capability.
NotOpenForReading,
SocketUnconnected,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,
/// In WASI, this error occurs when the file descriptor does
/// not hold the required rights to read from it.
AccessDenied,
/// Unable to read file due to lock. Depending on the `Io` implementation,
/// reading from a locked file may return this error, or may ignore the
/// lock.
LockViolation,
} || Io.UnexpectedError;
pub const Result = usize;
};
pub const Result = Result: {
const operation_fields = @typeInfo(Operation).@"union".fields;
var field_names: [operation_fields.len][]const u8 = undefined;
var field_types: [operation_fields.len]type = undefined;
for (operation_fields, &field_names, &field_types) |field, *field_name, *field_type| {
field_name.* = field.name;
field_type.* = field.type.Error!field.type.Result;
}
break :Result @Union(.auto, Tag, &field_names, &field_types, &@splat(.{}));
};
pub const Storage = union {
unused: List.DoubleNode,
submission: Submission,
pending: Pending,
completion: Completion,
pub const Submission = struct {
node: List.SingleNode,
operation: Operation,
};
pub const Pending = struct {
node: List.DoubleNode,
tag: Tag,
context: [3]usize,
};
pub const Completion = struct {
node: List.SingleNode,
result: Result,
};
};
pub const OptionalIndex = enum(u32) {
none = std.math.maxInt(u32),
_,
pub fn fromIndex(i: usize) OptionalIndex {
const oi: OptionalIndex = @enumFromInt(i);
assert(oi != .none);
return oi;
}
pub fn toIndex(oi: OptionalIndex) u32 {
assert(oi != .none);
return @intFromEnum(oi);
}
};
pub const List = struct {
head: OptionalIndex,
tail: OptionalIndex,
pub const empty: List = .{ .head = .none, .tail = .none };
pub const SingleNode = struct { next: OptionalIndex };
pub const DoubleNode = struct { prev: OptionalIndex, next: OptionalIndex };
};
};
/// Performs one `Operation`.
pub fn operate(io: Io, operation: Operation) Cancelable!Operation.Result {
return io.vtable.operate(io.userdata, operation);
}
/// Submits many operations together without waiting for all of them to
/// complete.
///
/// This is a low-level abstraction based on `Operation`. For a higher
/// level API that operates on `Future`, see `Select` and `Group`.
pub const Batch = struct {
storage: []Operation.Storage,
unused: Operation.List,
submissions: Operation.List,
pending: Operation.List,
completions: Operation.List,
context: ?*anyopaque,
/// After calling this, it is safe to unconditionally defer a call to
/// `cancel`.
pub fn init(storage: []Operation.Storage) Batch {
var prev: Operation.OptionalIndex = .none;
for (storage, 0..) |*operation, index| {
operation.* = .{ .unused = .{ .prev = prev, .next = .fromIndex(index + 1) } };
prev = .fromIndex(index);
}
storage[storage.len - 1].unused.next = .none;
return .{
.storage = storage,
.unused = .{
.head = .fromIndex(0),
.tail = .fromIndex(storage.len - 1),
},
.submissions = .empty,
.pending = .empty,
.completions = .empty,
.context = null,
};
}
/// Adds an operation to be performed at the next await call.
/// Returns the index that will be returned by `next` after the operation completes.
/// Asserts that no more than `storage.len` operations are active at a time.
pub fn add(b: *Batch, operation: Operation) u32 {
const index = b.unused.next;
b.addAt(index.toIndex(), operation);
return index;
}
/// Adds an operation to be performed at the next await call.
/// After the operation completes, `next` will return `index`.
/// Asserts that the operation at `index` is not active.
pub fn addAt(b: *Batch, index: u32, operation: Operation) void {
const storage = &b.storage[index];
const unused = storage.unused;
switch (unused.prev) {
.none => b.unused.head = .none,
else => |prev_index| b.storage[prev_index.toIndex()].unused.next = unused.next,
}
switch (unused.next) {
.none => b.unused.tail = .none,
else => |next_index| b.storage[next_index.toIndex()].unused.prev = unused.prev,
}
switch (b.submissions.tail) {
.none => b.submissions.head = .fromIndex(index),
else => |tail_index| b.storage[tail_index.toIndex()].submission.node.next = .fromIndex(index),
}
storage.* = .{ .submission = .{ .node = .{ .next = .none }, .operation = operation } };
b.submissions.tail = .fromIndex(index);
}
/// After calling `awaitAsync`, `awaitConcurrent`, or `cancel`, this
/// function iterates over the completed operations.
///
/// Each completion returned from this function dequeues from the `Batch`.
/// It is not required to dequeue all completions before awaiting again.
pub fn next(b: *Batch) ?struct { index: u32, result: Operation.Result } {
const index = b.completions.head;
if (index == .none) return null;
const storage = &b.storage[index.toIndex()];
const completion = storage.completion;
const next_index = completion.node.next;
b.completions.head = next_index;
if (next_index == .none) b.completions.tail = .none;
const tail_index = b.unused.tail;
switch (tail_index) {
.none => b.unused.head = index,
else => b.storage[tail_index.toIndex()].unused.next = index,
}
storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } };
b.unused.tail = index;
return .{ .index = index.toIndex(), .result = completion.result };
}
/// Waits for at least one of the submitted operations to complete. After
/// this function returns the completed operations can be iterated with
/// `next`.
///
/// This function provides opportunity for the implementation to introduce
/// concurrency into the batched operations, but unlike `awaitConcurrent`,
/// does not require it, and therefore cannot fail with
/// `error.ConcurrencyUnavailable`.
pub fn awaitAsync(b: *Batch, io: Io) Cancelable!void {
return io.vtable.batchAwaitAsync(io.userdata, b);
}
pub const AwaitConcurrentError = ConcurrentError || Cancelable || Timeout.Error;
/// Waits for at least one of the submitted operations to complete. After
/// this function returns the completed operations can be iterated with
/// `next`.
///
/// Unlike `awaitAsync`, this function requires the implementation to
/// perform the operations concurrently and therefore can fail with
/// `error.ConcurrencyUnavailable`.
pub fn awaitConcurrent(b: *Batch, io: Io, timeout: Timeout) AwaitConcurrentError!void {
return io.vtable.batchAwaitConcurrent(io.userdata, b, timeout);
}
/// Requests all pending operations to be interrupted, then waits for all
/// pending operations to complete. After this returns, the `Batch` is in a
/// well-defined state, ready to be iterated with `next`. Successfully
/// canceled operations will be absent from the iteration. Some operations
/// may have successfully completed regardless of the cancel request and
/// will appear in the iteration.
pub fn cancel(b: *Batch, io: Io) void {
return io.vtable.batchCancel(io.userdata, b);
}
};
pub const Limit = enum(usize) {
nothing = 0,
unlimited = std.math.maxInt(usize),
unlimited = math.maxInt(usize),
_,
/// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`.
/// `math.maxInt(usize)` is interpreted to mean `.unlimited`.
pub fn limited(n: usize) Limit {
return @enumFromInt(n);
}
/// Any value grater than `std.math.maxInt(usize)` is interpreted to mean
/// Any value grater than `math.maxInt(usize)` is interpreted to mean
/// `.unlimited`.
pub fn limited64(n: u64) Limit {
return @enumFromInt(@min(n, std.math.maxInt(usize)));
return @enumFromInt(@min(n, math.maxInt(usize)));
}
pub fn countVec(data: []const []const u8) Limit {
@@ -929,9 +709,9 @@ pub const Clock = enum {
};
}
pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool {
pub fn compare(lhs: Clock.Timestamp, op: math.CompareOperator, rhs: Clock.Timestamp) bool {
assert(lhs.clock == rhs.clock);
return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds);
return math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds);
}
};
@@ -996,7 +776,7 @@ pub const Duration = struct {
nanoseconds: i96,
pub const zero: Duration = .{ .nanoseconds = 0 };
pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) };
pub const max: Duration = .{ .nanoseconds = math.maxInt(i96) };
pub fn fromNanoseconds(x: i96) Duration {
return .{ .nanoseconds = x };
@@ -1652,7 +1432,7 @@ pub const Event = enum(u32) {
pub fn set(e: *Event, io: Io) void {
switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) {
.unset, .is_set => {},
.waiting => io.futexWake(Event, e, std.math.maxInt(u32)),
.waiting => io.futexWake(Event, e, math.maxInt(u32)),
}
}
+29 -3
View File
@@ -10,6 +10,18 @@ const assert = std.debug.assert;
const Dir = std.Io.Dir;
handle: Handle,
flags: Flags,
pub const Flags = struct {
/// * true:
/// - windows: opened with MODE.IO.ASYNCHRONOUS
/// - POSIX: O_NONBLOCK is set
/// * false:
/// - windows: opened with SYNCHRONOUS_ALERT or SYNCHRONOUS_NONALERT, or
/// not a file.
/// - POSIX: O_NONBLOCK is unset
nonblocking: bool,
};
pub const Handle = std.posix.fd_t;
@@ -18,6 +30,9 @@ pub const Writer = @import("File/Writer.zig");
pub const Atomic = @import("File/Atomic.zig");
/// Memory intended to remain consistent with file contents.
pub const MemoryMap = @import("File/MemoryMap.zig");
/// Concurrently read from multiple file streams, eliminating risk of
/// deadlocking.
pub const MultiReader = @import("File/MultiReader.zig");
pub const INode = std.posix.ino_t;
pub const NLink = std.posix.nlink_t;
@@ -77,9 +92,11 @@ pub fn stdout() File {
return switch (native_os) {
.windows => .{
.handle = std.os.windows.peb().ProcessParameters.hStdOutput,
.flags = .{ .nonblocking = false },
},
else => .{
.handle = std.posix.STDOUT_FILENO,
.flags = .{ .nonblocking = false },
},
};
}
@@ -88,9 +105,11 @@ pub fn stderr() File {
return switch (native_os) {
.windows => .{
.handle = std.os.windows.peb().ProcessParameters.hStdError,
.flags = .{ .nonblocking = false },
},
else => .{
.handle = std.posix.STDERR_FILENO,
.flags = .{ .nonblocking = false },
},
};
}
@@ -99,9 +118,11 @@ pub fn stdin() File {
return switch (native_os) {
.windows => .{
.handle = std.os.windows.peb().ProcessParameters.hStdInput,
.flags = .{ .nonblocking = false },
},
else => .{
.handle = std.posix.STDIN_FILENO,
.flags = .{ .nonblocking = false },
},
};
}
@@ -549,12 +570,18 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void {
});
}
pub const ReadStreamingError = error{EndOfStream} || Reader.Error;
/// Returns 0 on stream end or if `buffer` has no space available for data.
///
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usize {
return io.vtable.fileReadStreaming(io.userdata, file, buffer);
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize {
const result = try io.operate(.{ .file_read_streaming = .{
.file = file,
.data = buffer,
} });
return result.file_read_streaming;
}
pub const ReadPositionalError = error{
@@ -562,7 +589,6 @@ pub const ReadPositionalError = error{
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,
+269
View File
@@ -0,0 +1,269 @@
const MultiReader = @This();
const std = @import("../../std.zig");
const Io = std.Io;
const File = Io.File;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
gpa: Allocator,
streams: *Streams,
batch: Io.Batch,
pub const Context = struct {
mr: *MultiReader,
fr: File.Reader,
vec: [1][]u8,
err: ?Error,
};
pub const Error = UnendingError || error{EndOfStream};
pub const UnendingError = Allocator.Error || File.Reader.Error || Io.ConcurrentError;
/// Trailing:
/// * `contexts: [len]Context`
/// * `storage: [len]Io.Operation.Storage`
pub const Streams = extern struct {
len: u32,
pub fn contexts(s: *Streams) []Context {
const base: usize = @intFromPtr(s);
const ptr: [*]Context = @ptrFromInt(std.mem.alignForward(usize, base + @sizeOf(Streams), @alignOf(Context)));
return ptr[0..s.len];
}
pub fn storage(s: *Streams) []Io.Operation.Storage {
const prev = contexts(s);
const end = prev.ptr + prev.len;
const ptr: [*]Io.Operation.Storage = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation.Storage)));
return ptr[0..s.len];
}
};
pub fn Buffer(comptime n: usize) type {
return extern struct {
len: u32,
contexts: [n][@sizeOf(Context)]u8 align(@alignOf(Context)),
storage: [n][@sizeOf(Io.Operation.Storage)]u8 align(@alignOf(Io.Operation.Storage)),
pub fn toStreams(b: *@This()) *Streams {
b.len = n;
return @ptrCast(b);
}
};
}
/// See `Streams.Buffer` for convenience API to obtain the `streams` parameter.
pub fn init(mr: *MultiReader, gpa: Allocator, io: Io, streams: *Streams, files: []const File) void {
const contexts = streams.contexts();
for (contexts, files) |*context, file| context.* = .{
.mr = mr,
.fr = .{
.io = io,
.file = file,
.mode = .streaming,
.interface = .{
.vtable = &.{
.stream = stream,
.discard = discard,
.readVec = readVec,
.rebase = rebase,
},
.buffer = &.{},
.seek = 0,
.end = 0,
},
},
.vec = .{&.{}},
.err = null,
};
mr.* = .{
.gpa = gpa,
.streams = streams,
.batch = .init(streams.storage()),
};
for (contexts, 0..) |*context, i| {
const r = &context.fr.interface;
rebaseGrowing(mr, context, 1) catch |err| {
context.err = err;
continue;
};
context.vec[0] = r.buffer;
mr.batch.addAt(@intCast(i), .{ .file_read_streaming = .{
.file = context.fr.file,
.data = &context.vec,
} });
}
}
pub fn deinit(mr: *MultiReader) void {
const gpa = mr.gpa;
const contexts = mr.streams.contexts();
const io = contexts[0].fr.io;
mr.batch.cancel(io);
for (contexts) |*context| {
gpa.free(context.fr.interface.buffer);
}
}
pub fn fileReader(mr: *MultiReader, index: usize) *File.Reader {
return &mr.streams.contexts()[index].fr;
}
pub fn reader(mr: *MultiReader, index: usize) *Io.Reader {
return &mr.streams.contexts()[index].fr.interface;
}
/// Checks for errors in all streams, prioritizing `error.Canceled` if it
/// occurred anywhere, and ignoring `error.EndOfStream`.
pub fn checkAnyError(mr: *const MultiReader) UnendingError!void {
const contexts = mr.streams.contexts();
var other: UnendingError!void = {};
for (contexts) |*context| {
if (context.err) |err| switch (err) {
error.Canceled => |e| return e,
error.EndOfStream => continue,
else => |e| other = e,
};
}
return other;
}
pub fn toOwnedSlice(mr: *MultiReader, index: usize) Allocator.Error![]u8 {
const gpa = mr.gpa;
const r: *Io.Reader = reader(mr, index);
if (r.seek == 0) {
const new = try gpa.realloc(r.buffer, r.end);
r.buffer = &.{};
r.end = 0;
return new;
}
const new = try gpa.dupe(u8, r.buffered());
gpa.free(r.buffer);
r.buffer = &.{};
r.seek = 0;
r.end = 0;
return new;
}
fn stream(r: *Io.Reader, w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize {
_ = limit;
_ = w;
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
try fillUntimed(context, 1);
return 0;
}
fn discard(r: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
_ = limit;
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
try fillUntimed(context, 1);
return 0;
}
fn readVec(r: *Io.Reader, data: [][]u8) Io.Reader.Error!usize {
_ = data;
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
try fillUntimed(context, 1);
return 0;
}
fn rebase(r: *Io.Reader, capacity: usize) Io.Reader.RebaseError!void {
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
try fillUntimed(context, capacity);
}
fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void {
fill(context.mr, capacity, .none) catch |err| switch (err) {
error.Timeout, error.UnsupportedClock => unreachable,
error.Canceled, error.ConcurrencyUnavailable => |e| {
context.err = e;
return error.ReadFailed;
},
error.EndOfStream => |e| return e,
};
if (context.err) |err| switch (err) {
error.EndOfStream => |e| return e,
else => return error.ReadFailed,
};
}
pub const FillError = Io.Batch.AwaitConcurrentError || error{
/// `fill` was called when all streams already have failed or reached the
/// end.
EndOfStream,
};
/// Wait until at least one stream receives more data.
pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillError!void {
const contexts = mr.streams.contexts();
const io = contexts[0].fr.io;
var any_completed = false;
try mr.batch.awaitConcurrent(io, timeout);
while (mr.batch.next()) |operation| {
any_completed = true;
const context = &contexts[operation.index];
const n = operation.result.file_read_streaming catch |err| {
context.err = err;
continue;
};
const r = &context.fr.interface;
r.end += n;
if (r.buffer.len - r.end < unused_capacity) {
rebaseGrowing(mr, context, r.bufferedLen() + unused_capacity) catch |err| {
context.err = err;
continue;
};
assert(r.seek == 0);
}
context.vec[0] = r.buffer[r.end..];
mr.batch.addAt(operation.index, .{ .file_read_streaming = .{
.file = context.fr.file,
.data = &context.vec,
} });
}
if (!any_completed) return error.EndOfStream;
}
/// Wait until all streams fail or reach the end.
pub fn fillRemaining(mr: *MultiReader, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
while (fill(mr, 1, timeout)) |_| {} else |err| switch (err) {
error.EndOfStream => return,
else => |e| return e,
}
}
fn rebaseGrowing(mr: *MultiReader, context: *Context, capacity: usize) Allocator.Error!void {
const gpa = mr.gpa;
const r = &context.fr.interface;
if (r.buffer.len >= capacity) {
const data = r.buffer[r.seek..r.end];
@memmove(r.buffer[0..data.len], data);
r.seek = 0;
r.end = data.len;
} else {
const adjusted_capacity = std.ArrayList(u8).growCapacity(capacity);
if (r.seek == 0) {
if (gpa.remap(r.buffer, adjusted_capacity)) |new_memory| {
r.buffer = new_memory;
return;
}
}
const data = r.buffer[r.seek..r.end];
const new = try gpa.alloc(u8, adjusted_capacity);
@memcpy(new[0..data.len], data);
gpa.free(r.buffer);
r.buffer = new;
r.seek = 0;
r.end = data.len;
}
}
+19 -35
View File
@@ -26,27 +26,7 @@ size_err: ?SizeError = null,
seek_err: ?SeekError = null,
interface: Io.Reader,
pub const Error = error{
InputOutput,
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
ConnectionResetByPeer,
/// File was not opened with read capability.
NotOpenForReading,
SocketUnconnected,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,
/// In WASI, this error occurs when the file descriptor does
/// not hold the required rights to read from it.
AccessDenied,
/// Unable to read file due to lock. Depending on the `Io` implementation,
/// reading from a locked file may return this error, or may ignore the
/// lock.
LockViolation,
} || Io.Cancelable || Io.UnexpectedError;
pub const Error = Io.Operation.FileReadStreaming.UnendingError || Io.Cancelable;
pub const SizeError = File.StatError || error{
/// Occurs if, for example, the file handle is a network socket and therefore does not have a size.
@@ -300,14 +280,16 @@ fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, r.file, dest) catch |err| {
r.err = err;
return error.ReadFailed;
const n = r.file.readStreaming(io, dest) catch |err| switch (err) {
error.EndOfStream => {
r.size = r.pos;
return error.EndOfStream;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
@@ -355,14 +337,16 @@ fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, &data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, file, dest) catch |err| {
r.err = err;
return error.ReadFailed;
const n = file.readStreaming(io, dest) catch |err| switch (err) {
error.EndOfStream => {
r.size = r.pos;
return error.EndOfStream;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
+24 -5
View File
@@ -127,9 +127,7 @@ pub const ShortError = error{
ReadFailed,
};
pub const RebaseError = error{
EndOfStream,
};
pub const RebaseError = Error;
pub const failing: Reader = .{
.vtable = &.{
@@ -315,6 +313,27 @@ pub fn allocRemainingAlignedSentinel(
}
}
pub const AppendExactError = Allocator.Error || Error;
/// Transfers exactly `n` bytes from the reader to the `ArrayList`.
///
/// See also:
/// * `appendRemaining`
pub fn appendExact(
r: *Reader,
gpa: Allocator,
list: *ArrayList(u8),
n: usize,
) AppendExactError!void {
try list.ensureUnusedCapacity(gpa, n);
var a = std.Io.Writer.Allocating.fromArrayList(gpa, list);
defer list.* = a.toArrayList();
streamExact(r, &a.writer, n) catch |err| switch (err) {
error.ReadFailed, error.EndOfStream => |e| return e,
error.WriteFailed => unreachable,
};
}
/// Transfers all bytes from the current position to the end of the stream, up
/// to `limit`, appending them to `list`.
///
@@ -1381,7 +1400,7 @@ pub fn takeLeb128(r: *Reader, comptime T: type) TakeLeb128Error!T {
}
/// Ensures `capacity` data can be buffered without rebasing.
pub fn rebase(r: *Reader, capacity: usize) RebaseError!void {
pub fn rebase(r: *Reader, capacity: usize) Error!void {
if (r.buffer.len - r.seek >= capacity) {
@branchHint(.likely);
return;
@@ -1389,7 +1408,7 @@ pub fn rebase(r: *Reader, capacity: usize) RebaseError!void {
return r.vtable.rebase(r, capacity);
}
pub fn defaultRebase(r: *Reader, capacity: usize) RebaseError!void {
pub fn defaultRebase(r: *Reader, capacity: usize) Error!void {
assert(r.buffer.len - r.seek < capacity);
const data = r.buffer[r.seek..r.end];
@memmove(r.buffer[0..data.len], data);
+702 -93
View File
@@ -1255,6 +1255,32 @@ const AlertableSyscall = struct {
assert(is_windows);
}
fn start() Io.Cancelable!AlertableSyscall {
const thread = Thread.current orelse return .{ .thread = null };
switch (thread.cancel_protection) {
.blocked => return .{ .thread = null },
.unblocked => {},
}
const old_status = thread.status.fetchOr(.{
.cancelation = @enumFromInt(0b010),
.awaitable = .null,
}, .monotonic);
switch (old_status.cancelation) {
.parked => unreachable,
.blocked => unreachable,
.blocked_alertable => unreachable,
.blocked_canceling => unreachable,
.blocked_alertable_canceling => unreachable,
.none => return .{ .thread = thread }, // new status is `.blocked_alertable`
.canceling => {
// Status is unchanged (still `.canceling`)---change to `.canceled` before return.
thread.status.store(.{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic);
return error.Canceled;
},
.canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged)
}
}
fn checkCancel(s: AlertableSyscall) Io.Cancelable!void {
comptime assert(is_windows);
const thread = s.thread orelse return;
@@ -1314,8 +1340,17 @@ const AlertableSyscall = struct {
}
};
fn waitForApcOrAlert() void {
const infinite_timeout: windows.LARGE_INTEGER = std.math.minInt(windows.LARGE_INTEGER);
_ = windows.ntdll.NtDelayExecution(windows.TRUE, &infinite_timeout);
}
const max_iovecs_len = 8;
const splat_buffer_size = 64;
/// Happens to be the same number that matches maximum number of handles that
/// NtWaitForMultipleObjects accepts. We use this value also for poll() on
/// posix systems.
const poll_buffer_len = 64;
const default_PATH = "/usr/local/bin:/bin/:/usr/bin";
comptime {
@@ -1579,6 +1614,11 @@ pub fn io(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.operate = operate,
.batchAwaitAsync = batchAwaitAsync,
.batchAwaitConcurrent = batchAwaitConcurrent,
.batchCancel = batchCancel,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
.dirCreateDirPathOpen = dirCreateDirPathOpen,
@@ -1613,7 +1653,6 @@ pub fn io(t: *Threaded) Io {
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
@@ -1739,6 +1778,11 @@ pub fn ioBasic(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.operate = operate,
.batchAwaitAsync = batchAwaitAsync,
.batchAwaitConcurrent = batchAwaitConcurrent,
.batchCancel = batchCancel,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
.dirCreateDirPathOpen = dirCreateDirPathOpen,
@@ -1773,7 +1817,6 @@ pub fn ioBasic(t: *Threaded) Io {
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
@@ -2440,6 +2483,485 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
Thread.futexWake(ptr, max_waiters);
}
fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Operation.Result {
const t: *Threaded = @ptrCast(@alignCast(userdata));
switch (operation) {
.file_read_streaming => |o| return .{
.file_read_streaming = fileReadStreaming(t, o.file, o.data) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
},
},
}
}
fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
batchAwaitWindows(b, false) catch |err| switch (err) {
error.ConcurrencyUnavailable => unreachable, // passed concurrency=false
else => |e| return e,
};
const alertable_syscall = try AlertableSyscall.start();
while (b.pending.head != .none and b.completions.head == .none) waitForApcOrAlert();
alertable_syscall.finish();
return;
}
if (native_os == .wasi and !builtin.link_libc) @panic("TODO");
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var poll_len: u32 = 0;
{
var index = b.submissions.head;
while (index != .none and poll_len < poll_buffer_len) {
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
poll_len += 1;
},
}
index = submission.node.next;
}
}
switch (poll_len) {
0 => return,
1 => {},
else => while (true) {
const timeout_ms: i32 = t: {
if (b.completions.head != .none) {
// It is legal to call batchWait with already completed
// operations in the ring. In such case, we need to avoid
// blocking in the poll syscall, but we can still take this
// opportunity to find additional ready operations.
break :t 0;
}
const max_poll_ms = std.math.maxInt(i32);
break :t max_poll_ms;
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
if (b.completions.head != .none) {
// Since there are already completions available in the
// queue, this is neither a timeout nor a case for
// retrying.
return;
}
continue;
}
var prev_index: Io.Operation.OptionalIndex = .none;
var index = b.submissions.head;
for (poll_buffer[0..poll_len]) |poll_entry| {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
if (poll_entry.revents != 0) {
const result = try operate(t, submission.operation);
switch (prev_index) {
.none => b.submissions.head = next_index,
else => b.storage[prev_index.toIndex()].submission.node.next = next_index,
}
if (next_index == .none) b.submissions.tail = prev_index;
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completions.tail = index;
} else prev_index = index;
index = next_index;
}
assert(index == .none);
return;
},
.INTR => continue,
else => break,
}
},
}
{
var tail_index = b.completions.tail;
defer b.completions.tail = tail_index;
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
const result = try operate(t, submission.operation);
switch (tail_index) {
.none => b.completions.head = index,
else => b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
tail_index = index;
index = next_index;
}
b.submissions = .{ .head = .none, .tail = .none };
}
}
fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) {
error.Unexpected => deadline: {
recoverableOsBugDetected();
break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake };
},
error.UnsupportedClock => |e| return e,
};
try batchAwaitWindows(b, true);
while (b.pending.head != .none and b.completions.head == .none) {
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
break :interval t.deadlineToWindowsInterval(d) catch |err| switch (err) {
error.UnsupportedClock => |e| return e,
error.Unexpected => {
recoverableOsBugDetected();
break :interval -1;
},
};
};
const alertable_syscall = try AlertableSyscall.start();
const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval);
alertable_syscall.finish();
switch (delay_rc) {
.SUCCESS, .TIMEOUT => {
// The thread woke due to the timeout. Although spurious
// timeouts are OK, when no deadline is passed we must not
// return `error.Timeout`.
if (timeout != .none and b.completions.head == .none) return error.Timeout;
},
else => {},
}
}
return;
}
if (native_os == .wasi and !builtin.link_libc) @panic("TODO");
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var poll_storage: struct {
gpa: std.mem.Allocator,
b: *Io.Batch,
slice: []posix.pollfd,
len: u32,
fn add(storage: *@This(), file: Io.File, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void {
const len = storage.len;
if (len == poll_buffer_len) {
const slice: []posix.pollfd = if (storage.b.context) |context|
@as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..storage.b.storage.len]
else allocation: {
const allocation = storage.gpa.alloc(posix.pollfd, storage.b.storage.len) catch
return error.ConcurrencyUnavailable;
storage.b.context = allocation.ptr;
break :allocation allocation;
};
@memcpy(slice[0..poll_buffer_len], storage.slice);
}
storage.slice[len] = .{
.fd = file.handle,
.events = events,
.revents = 0,
};
storage.len = len + 1;
}
} = .{ .gpa = t.allocator, .b = b, .slice = &poll_buffer, .len = 0 };
{
var index = b.submissions.head;
while (index != .none) {
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN),
}
index = submission.node.next;
}
}
switch (poll_storage.len) {
0 => return,
1 => if (timeout == .none) {
const index = b.submissions.head;
const storage = &b.storage[index.toIndex()];
const result = try operate(t, storage.submission.operation);
b.submissions = .{ .head = .none, .tail = .none };
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completions.tail = index;
return;
},
else => {},
}
const t_io = ioBasic(t);
const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock;
while (true) {
const timeout_ms: i32 = t: {
if (b.completions.head != .none) {
// It is legal to call batchWait with already completed
// operations in the ring. In such case, we need to avoid
// blocking in the poll syscall, but we can still take this
// opportunity to find additional ready operations.
break :t 0;
}
const d = deadline orelse break :t -1;
const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock;
if (duration.raw.nanoseconds <= 0) return error.Timeout;
const max_poll_ms = std.math.maxInt(i32);
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_storage.len, timeout_ms);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
if (b.completions.head != .none) {
// Since there are already completions available in the
// queue, this is neither a timeout nor a case for
// retrying.
return;
}
// Although spurious timeouts are OK, when no deadline is
// passed we must not return `error.Timeout`.
if (deadline == null) continue;
return error.Timeout;
}
var prev_index: Io.Operation.OptionalIndex = .none;
var index = b.submissions.head;
for (poll_storage.slice[0..poll_storage.len]) |poll_entry| {
const submission = &b.storage[index.toIndex()].submission;
const next_index = submission.node.next;
if (poll_entry.revents != 0) {
const result = try operate(t, submission.operation);
switch (prev_index) {
.none => b.submissions.head = next_index,
else => b.storage[prev_index.toIndex()].submission.node.next = next_index,
}
if (next_index == .none) b.submissions.tail = prev_index;
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
b.completions.tail = index;
b.storage[index.toIndex()] = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
} else prev_index = index;
index = next_index;
}
assert(index == .none);
return;
},
.INTR => continue,
else => return error.ConcurrencyUnavailable,
}
}
}
const WindowsBatchPendingOperationContext = extern struct {
file: windows.HANDLE,
iosb: windows.IO_STATUS_BLOCK,
const Erased = [3]usize;
comptime {
assert(@sizeOf(Erased) <= @sizeOf(WindowsBatchPendingOperationContext));
}
fn toErased(context: *WindowsBatchPendingOperationContext) *Erased {
return @ptrCast(context);
}
fn fromErased(erased: *Erased) *WindowsBatchPendingOperationContext {
return @ptrCast(erased);
}
};
fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
{
var tail_index = b.unused.tail;
defer b.unused.tail = tail_index;
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const next_index = b.storage[index.toIndex()].submission.node.next;
switch (tail_index) {
.none => b.unused.head = index,
else => b.storage[tail_index.toIndex()].unused.next = index,
}
b.storage[index.toIndex()] = .{ .unused = .{ .prev = tail_index, .next = .none } };
tail_index = index;
index = next_index;
}
b.submissions = .{ .head = .none, .tail = .none };
}
if (is_windows) {
var index = b.pending.head;
while (index != .none) {
const pending = &b.storage[index.toIndex()].pending;
const context: *WindowsBatchPendingOperationContext = .fromErased(&pending.context);
var cancel_iosb: windows.IO_STATUS_BLOCK = undefined;
_ = windows.ntdll.NtCancelIoFileEx(context.file, &context.iosb, &cancel_iosb);
index = pending.node.next;
}
while (b.pending.head != .none) waitForApcOrAlert();
} else if (b.context) |context| {
t.allocator.free(@as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..b.storage.len]);
b.context = null;
}
assert(b.pending.head == .none);
}
fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void {
const b: *Io.Batch = @ptrCast(@alignCast(apc_context));
const context: *WindowsBatchPendingOperationContext = @fieldParentPtr("iosb", iosb);
const erased_context = context.toErased();
const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("context", erased_context);
switch (pending.node.prev) {
.none => b.pending.head = pending.node.next,
else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next,
}
switch (pending.node.next) {
.none => b.pending.tail = pending.node.prev,
else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev,
}
const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending);
const index = storage - b.storage.ptr;
switch (iosb.u.Status) {
.CANCELLED => {
const tail_index = b.unused.tail;
switch (tail_index) {
.none => b.unused.head = .fromIndex(index),
else => b.storage[tail_index.toIndex()].unused.next = .fromIndex(index),
}
storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } };
b.unused.tail = .fromIndex(index);
},
else => {
switch (b.completions.tail) {
.none => b.completions.head = .fromIndex(index),
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = .fromIndex(index),
}
b.completions.tail = .fromIndex(index);
const result: Io.Operation.Result = switch (pending.tag) {
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
}
}
/// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable.
fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, ConcurrencyUnavailable }!void {
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = storage.submission;
storage.* = .{ .pending = .{
.node = .{ .prev = b.pending.tail, .next = .none },
.tag = submission.operation,
.context = undefined,
} };
switch (b.pending.tail) {
.none => b.pending.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].pending.node.next = index,
}
b.pending.tail = index;
const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context);
errdefer {
context.iosb.u.Status = .CANCELLED;
batchApc(b, &context.iosb, 0);
}
switch (submission.operation) {
.file_read_streaming => |o| o: {
var data_index: usize = 0;
while (o.data.len - data_index != 0 and o.data[data_index].len == 0) data_index += 1;
if (o.data.len - data_index == 0) {
context.iosb = .{
.u = .{ .Status = .SUCCESS },
.Information = 0,
};
batchApc(b, &context.iosb, 0);
break :o;
}
const buffer = o.data[data_index];
const short_buffer_len = @min(std.math.maxInt(u32), buffer.len);
if (o.file.flags.nonblocking) {
context.file = o.file.handle;
switch (windows.ntdll.NtReadFile(
o.file.handle,
null, // event
&batchApc,
b,
&context.iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
.PENDING, .SUCCESS => {},
.CANCELLED => unreachable,
else => |status| {
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
},
}
} else {
if (concurrency) return error.ConcurrencyUnavailable;
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtReadFile(
o.file.handle,
null, // event
null, // APC routine
null, // APC context
&context.iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
.PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
.CANCELLED => {
try syscall.checkCancel();
continue;
},
else => |status| {
syscall.finish();
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
break;
},
};
}
},
}
index = submission.node.next;
}
b.submissions = .{ .head = .none, .tail = .none };
}
fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void {
const ct = complete_tail.*;
const len: u31 = @intCast(ring.len);
ring[ct.index(len)] = op;
complete_tail.* = ct.next(len);
}
const dirCreateDir = switch (native_os) {
.windows => dirCreateDirWindows,
.wasi => dirCreateDirWasi,
@@ -2759,8 +3281,10 @@ fn dirCreateDirPathOpenWasi(
fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const file: File = .{ .handle = dir.handle };
return fileStat(t, file);
return fileStat(t, .{
.handle = dir.handle,
.flags = .{ .nonblocking = false },
});
}
const dirStatFile = switch (native_os) {
@@ -3552,7 +4076,10 @@ fn dirCreateFilePosix(
}
}
return .{ .handle = fd };
return .{
.handle = fd,
.flags = .{ .nonblocking = false },
};
}
fn dirCreateFileWindows(
@@ -3682,7 +4209,10 @@ fn dirCreateFileWindows(
errdefer windows.CloseHandle(handle);
const exclusive = switch (flags.lock) {
.none => return .{ .handle = handle },
.none => return .{
.handle = handle,
.flags = .{ .nonblocking = false },
},
.shared => false,
.exclusive => true,
};
@@ -3702,7 +4232,10 @@ fn dirCreateFileWindows(
)) {
.SUCCESS => {
syscall.finish();
return .{ .handle = handle };
return .{
.handle = handle,
.flags = .{ .nonblocking = false },
};
},
.INSUFFICIENT_RESOURCES => return syscall.fail(error.SystemResources),
.LOCK_NOT_GRANTED => return syscall.fail(error.WouldBlock),
@@ -3751,7 +4284,10 @@ fn dirCreateFileWasi(
switch (wasi.path_open(dir.handle, lookup_flags, sub_path.ptr, sub_path.len, oflags, base, inheriting, fdflags, &fd)) {
.SUCCESS => {
syscall.finish();
return .{ .handle = fd };
return .{
.handle = fd,
.flags = .{ .nonblocking = false },
};
},
.INTR => {
try syscall.checkCancel();
@@ -3846,7 +4382,10 @@ fn dirCreateFileAtomic(
.SUCCESS => {
syscall.finish();
return .{
.file = .{ .handle = @intCast(rc) },
.file = .{
.handle = @intCast(rc),
.flags = .{ .nonblocking = false },
},
.file_basename_hex = 0,
.dest_sub_path = dest_path,
.file_open = true,
@@ -4054,7 +4593,10 @@ fn dirOpenFilePosix(
if (!flags.allow_directory) {
const is_dir = is_dir: {
const stat = fileStat(t, .{ .handle = fd }) catch |err| switch (err) {
const stat = fileStat(t, .{
.handle = fd,
.flags = .{ .nonblocking = false },
}) catch |err| switch (err) {
// The directory-ness is either unknown or unknowable
error.Streaming => break :is_dir false,
else => |e| return e,
@@ -4140,7 +4682,10 @@ fn dirOpenFilePosix(
}
}
return .{ .handle = fd };
return .{
.handle = fd,
.flags = .{ .nonblocking = false },
};
}
fn dirOpenFileWindows(
@@ -4273,7 +4818,10 @@ pub fn dirOpenFileWtf16(
errdefer w.CloseHandle(handle);
const exclusive = switch (flags.lock) {
.none => return .{ .handle = handle },
.none => return .{
.handle = handle,
.flags = .{ .nonblocking = false },
},
.shared => false,
.exclusive => true,
};
@@ -4296,7 +4844,10 @@ pub fn dirOpenFileWtf16(
.ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer
else => |status| return syscall.unexpectedNtstatus(status),
};
return .{ .handle = handle };
return .{
.handle = handle,
.flags = .{ .nonblocking = false },
};
}
fn dirOpenFileWasi(
@@ -4378,7 +4929,7 @@ fn dirOpenFileWasi(
if (!flags.allow_directory) {
const is_dir = is_dir: {
const stat = fileStat(t, .{ .handle = fd }) catch |err| switch (err) {
const stat = fileStat(t, .{ .handle = fd, .flags = .{ .nonblocking = false } }) catch |err| switch (err) {
// The directory-ness is either unknown or unknowable
error.Streaming => break :is_dir false,
else => |e| return e,
@@ -4388,7 +4939,10 @@ fn dirOpenFileWasi(
if (is_dir) return error.IsDir;
}
return .{ .handle = fd };
return .{
.handle = fd,
.flags = .{ .nonblocking = false },
};
}
const dirOpenDir = switch (native_os) {
@@ -5277,7 +5831,7 @@ fn dirRealPathFileWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8,
fn realPathWindows(h_file: windows.HANDLE, out_buffer: []u8) File.RealPathError!usize {
var wide_buf: [windows.PATH_MAX_WIDE]u16 = undefined;
// TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
// TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const wide_slice = try windows.GetFinalPathNameByHandle(h_file, .{}, &wide_buf);
@@ -8275,14 +8829,14 @@ fn fileClose(userdata: ?*anyopaque, files: []const File) void {
for (files) |file| posix.close(file.handle);
}
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.ReadStreamingError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
if (is_windows) return fileReadStreamingWindows(file, data);
return fileReadStreamingPosix(file, data);
}
fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingError!usize {
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
@@ -8303,28 +8857,24 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => {
syscall.finish();
if (nread == 0) return error.EndOfStream;
return nread;
},
.INTR, .TIMEDOUT => {
try syscall.checkCancel();
continue;
},
else => |e| {
syscall.finish();
switch (e) {
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.BADF => return error.IsDir, // File operation on directory.
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
},
.BADF => return syscall.fail(error.IsDir), // File operation on directory.
.IO => return syscall.fail(error.InputOutput),
.ISDIR => return syscall.fail(error.IsDir),
.NOBUFS => return syscall.fail(error.SystemResources),
.NOMEM => return syscall.fail(error.SystemResources),
.NOTCONN => return syscall.fail(error.SocketUnconnected),
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.NOTCAPABLE => return syscall.fail(error.AccessDenied),
.INVAL => |err| return syscall.errnoBug(err),
.FAULT => |err| return syscall.errnoBug(err),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
@@ -8335,75 +8885,115 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
switch (posix.errno(rc)) {
.SUCCESS => {
syscall.finish();
if (rc == 0) return error.EndOfStream;
return @intCast(rc);
},
.INTR, .TIMEDOUT => {
try syscall.checkCancel();
continue;
},
else => |e| {
.BADF => {
syscall.finish();
switch (e) {
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => return error.WouldBlock,
.BADF => {
if (native_os == .wasi) return error.IsDir; // File operation on directory.
return error.NotOpenForReading;
},
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
else => |err| return posix.unexpectedErrno(err),
}
if (native_os == .wasi) return error.IsDir; // File operation on directory.
return error.NotOpenForReading;
},
.AGAIN => return syscall.fail(error.WouldBlock),
.IO => return syscall.fail(error.InputOutput),
.ISDIR => return syscall.fail(error.IsDir),
.NOBUFS => return syscall.fail(error.SystemResources),
.NOMEM => return syscall.fail(error.SystemResources),
.NOTCONN => return syscall.fail(error.SocketUnconnected),
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.INVAL => |err| return syscall.errnoBug(err),
.FAULT => |err| return syscall.errnoBug(err),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize {
const DWORD = windows.DWORD;
fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
if (index == data.len) return 0;
while (data.len - index != 0 and data[index].len == 0) index += 1;
if (data.len - index == 0) return 0;
const buffer = data[index];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
const short_buffer_len = @min(std.math.maxInt(u32), buffer.len);
const syscall: Syscall = try .start();
while (true) {
var n: DWORD = undefined;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) != 0) {
syscall.finish();
return n;
}
switch (windows.GetLastError()) {
.IO_PENDING => |err| {
syscall.finish();
return windows.errorBug(err);
},
.OPERATION_ABORTED => {
var iosb: windows.IO_STATUS_BLOCK = undefined;
if (!file.flags.nonblocking) {
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtReadFile(
file.handle,
null, // event
null, // APC routine
null, // APC context
&iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
.PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
.CANCELLED => {
try syscall.checkCancel();
continue;
},
.BROKEN_PIPE, .HANDLE_EOF => {
else => |status| {
syscall.finish();
return 0;
iosb.u.Status = status;
return ntReadFileResult(&iosb);
},
.NETNAME_DELETED => if (is_debug) unreachable else return error.Unexpected,
.LOCK_VIOLATION => return syscall.fail(error.LockViolation),
.ACCESS_DENIED => return syscall.fail(error.AccessDenied),
.INVALID_HANDLE => if (is_debug) unreachable else return error.Unexpected,
// TODO: Determine if INVALID_FUNCTION is possible in more scenarios than just passing
// a handle to a directory.
.INVALID_FUNCTION => return syscall.fail(error.IsDir),
else => |err| {
syscall.finish();
return windows.unexpectedError(err);
},
}
};
}
var done: bool = false;
switch (windows.ntdll.NtReadFile(
file.handle,
null, // event
flagApc,
&done, // APC context
&iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
// We must wait for the APC routine.
.PENDING, .SUCCESS => while (!done) {
// Once we get here we must not return from the function until the
// operation completes, thereby releasing reference to io_status_block.
const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
error.Canceled => |e| {
var cancel_iosb: windows.IO_STATUS_BLOCK = undefined;
_ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb);
while (!done) waitForApcOrAlert();
return e;
},
};
waitForApcOrAlert();
alertable_syscall.finish();
},
else => |status| iosb.u.Status = status,
}
return ntReadFileResult(&iosb);
}
fn flagApc(userdata: ?*anyopaque, _: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void {
const flag: *bool = @ptrCast(userdata);
flag.* = true;
}
fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
switch (io_status_block.u.Status) {
.PENDING => unreachable,
.CANCELLED => unreachable,
.SUCCESS => return io_status_block.Information,
.END_OF_FILE, .PIPE_BROKEN => return error.EndOfStream,
.INVALID_DEVICE_REQUEST => return error.IsDir,
.LOCK_NOT_GRANTED => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
else => |status| return windows.unexpectedStatus(status),
}
}
@@ -9037,7 +9627,7 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) process.Execut
};
defer w.CloseHandle(h_file);
// TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
// TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const wide_slice = try w.GetFinalPathNameByHandle(h_file, .{}, &path_name_w_buf.data);
@@ -9359,6 +9949,7 @@ fn writeFileStreamingWindows(
handle: windows.HANDLE,
bytes: []const u8,
) File.Writer.Error!usize {
assert(bytes.len != 0);
var bytes_written: windows.DWORD = undefined;
const adjusted_len = std.math.lossyCast(u32, bytes.len);
const syscall: Syscall = try .start();
@@ -10075,6 +10666,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (timeout == .none) return;
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t)));
if (native_os == .wasi) return sleepWasi(t, timeout);
if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout);
@@ -12707,7 +13299,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr
if (is_windows) {
var dir_path_buffer: [windows.PATH_MAX_WIDE]u16 = undefined;
// TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
// TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const dir_path = try windows.GetFinalPathNameByHandle(dir.handle, .{}, &dir_path_buffer);
const path_len_bytes = std.math.cast(u16, dir_path.len * 2) orelse return error.NameTooLong;
@@ -13898,15 +14490,15 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
.pid = pid,
.err_fd = err_pipe[0],
.stdin = switch (options.stdin) {
.pipe => .{ .handle = stdin_pipe[1] },
.pipe => .{ .handle = stdin_pipe[1], .flags = .{ .nonblocking = false } },
else => null,
},
.stdout = switch (options.stdout) {
.pipe => .{ .handle = stdout_pipe[0] },
.pipe => .{ .handle = stdout_pipe[0], .flags = .{ .nonblocking = false } },
else => null,
},
.stderr = switch (options.stderr) {
.pipe => .{ .handle = stderr_pipe[0] },
.pipe => .{ .handle = stderr_pipe[0], .flags = .{ .nonblocking = false } },
else => null,
},
};
@@ -14560,9 +15152,9 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
return .{
.id = piProcInfo.hProcess,
.thread_handle = piProcInfo.hThread,
.stdin = if (g_hChildStd_IN_Wr) |h| .{ .handle = h } else null,
.stdout = if (g_hChildStd_OUT_Rd) |h| .{ .handle = h } else null,
.stderr = if (g_hChildStd_ERR_Rd) |h| .{ .handle = h } else null,
.stdin = if (g_hChildStd_IN_Wr) |h| .{ .handle = h, .flags = .{ .nonblocking = false } } else null,
.stdout = if (g_hChildStd_OUT_Rd) |h| .{ .handle = h, .flags = .{ .nonblocking = true } } else null,
.stderr = if (g_hChildStd_ERR_Rd) |h| .{ .handle = h, .flags = .{ .nonblocking = true } } else null,
.request_resource_usage_statistics = options.request_resource_usage_statistics,
};
}
@@ -14607,7 +15199,7 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
if (t.random_file.handle) |prev_handle| {
_ = windows.ntdll.NtClose(fresh_handle);
windows.CloseHandle(fresh_handle);
return prev_handle;
} else {
t.random_file.handle = fresh_handle;
@@ -15696,6 +16288,7 @@ fn progressParentFile(userdata: ?*anyopaque) std.Progress.ParentFileError!File {
.pointer => @ptrFromInt(int),
else => return error.UnsupportedOperation,
},
.flags = .{ .nonblocking = false },
};
}
@@ -16375,7 +16968,7 @@ const parking_sleep = struct {
/// Spurious wakeups are possible.
///
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
comptime assert(use_parking_futex or use_parking_sleep);
switch (native_os) {
.windows => {
@@ -16431,6 +17024,22 @@ fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) err
}
}
fn deadlineToWindowsInterval(t: *Io.Threaded, deadline: Io.Clock.Timestamp) Io.Clock.Error!windows.LARGE_INTEGER {
// ntdll only supports two combinations:
// * real-time (`.real`) sleeps with absolute deadlines
// * monotonic (`.awake`/`.boot`) sleeps with relative durations
switch (deadline.clock) {
.cpu_process, .cpu_thread => unreachable, // cannot sleep for CPU time
.real => {
return @intCast(@max(@divTrunc(deadline.raw.nanoseconds, 100), 0));
},
.awake, .boot => {
const duration = try deadline.durationFromNow(ioBasic(t));
return @intCast(@min(@divTrunc(-duration.raw.nanoseconds, 100), -1));
},
}
}
const UnparkTid = switch (native_os) {
// `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles?
.windows => usize,
+2 -2
View File
@@ -188,8 +188,8 @@ test "cancel blocked read from pipe" {
}),
else => {
const pipe = try std.Io.Threaded.pipe2(.{});
read_end = .{ .handle = pipe[0] };
write_end = .{ .handle = pipe[1] };
read_end = .{ .handle = pipe[0], .flags = .{ .nonblocking = false } };
write_end = .{ .handle = pipe[1], .flags = .{ .nonblocking = false } };
},
}
defer {
+2 -2
View File
@@ -979,12 +979,13 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
if (main_parent == .unused) continue;
const file: Io.File = .{
.handle = main_storage.getIpcFd() orelse continue,
.flags = .{ .nonblocking = true },
};
const opt_saved_metadata = findOld(file.handle, old_ipc_metadata_fds, old_ipc_metadata);
var bytes_read: usize = 0;
while (true) {
const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) {
error.WouldBlock => break,
error.WouldBlock, error.EndOfStream => break,
else => |e| {
std.log.debug("failed to read child progress data: {t}", .{e});
main_storage.completed_count = 0;
@@ -992,7 +993,6 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
continue :main_loop;
},
};
if (n == 0) break;
if (opt_saved_metadata) |m| {
if (m.remaining_read_trash_bytes > 0) {
assert(bytes_read == 0);
+2 -1
View File
@@ -336,10 +336,11 @@ pub fn init(input: *Reader, output: *Writer, options: Options) InitError!Client
// Ensure the input buffer pointer is stable in this scope.
input.rebase(tls.max_ciphertext_record_len) catch |err| switch (err) {
error.EndOfStream => {}, // We have assurance the remainder of stream can be buffered.
error.ReadFailed => |e| return e,
};
const record_header = input.peek(tls.record_header_len) catch |err| switch (err) {
error.EndOfStream => return error.TlsConnectionTruncated,
error.ReadFailed => return error.ReadFailed,
error.ReadFailed => |e| return e,
};
const record_ct = input.takeEnumNonexhaustive(tls.ContentType, .big) catch unreachable; // already peeked
input.toss(2); // legacy_version
-3
View File
@@ -188,9 +188,6 @@ pub extern "kernel32" fn PostQueuedCompletionStatus(
lpOverlapped: ?*OVERLAPPED,
) callconv(.winapi) BOOL;
// TODO:
// GetOverlappedResultEx with bAlertable=false, which calls: GetStdHandle + WaitForSingleObjectEx.
// Uses the SwitchBack system to run implementations for older programs; Do we care about this?
pub extern "kernel32" fn GetOverlappedResult(
hFile: HANDLE,
lpOverlapped: *OVERLAPPED,
+9 -2
View File
@@ -594,6 +594,13 @@ pub extern "ntdll" fn NtCancelSynchronousIoFile(
IoStatusBlock: *IO_STATUS_BLOCK,
) callconv(.winapi) NTSTATUS;
/// This function has been observed to return SUCCESS on timeout on Windows 10
/// and TIMEOUT on Wine 10.0.
///
/// This function has been observed on Windows 11 such that positive interval
/// is real time, which can cause waits to be interrupted by changing system
/// time, however negative intervals are not affected by changes to system
/// time.
pub extern "ntdll" fn NtDelayExecution(
Alertable: BOOLEAN,
DelayInterval: *const LARGE_INTEGER,
@@ -606,6 +613,6 @@ pub extern "ntdll" fn NtCancelIoFileEx(
) callconv(.winapi) NTSTATUS;
pub extern "ntdll" fn NtCancelIoFile(
handle: HANDLE,
iosbToCancel: *const IO_STATUS_BLOCK,
FileHandle: HANDLE,
IoStatusBlock: *IO_STATUS_BLOCK,
) callconv(.winapi) NTSTATUS;
+6 -3
View File
@@ -126,8 +126,8 @@ test "pipe" {
const io = testing.io;
const fds = try std.Io.Threaded.pipe2(.{});
const out: Io.File = .{ .handle = fds[0] };
const in: Io.File = .{ .handle = fds[1] };
const out: Io.File = .{ .handle = fds[0], .flags = .{ .nonblocking = false } };
const in: Io.File = .{ .handle = fds[1], .flags = .{ .nonblocking = false } };
try in.writeStreamingAll(io, "hello");
var buf: [16]u8 = undefined;
try expect((try out.readStreaming(io, &.{&buf})) == 5);
@@ -150,7 +150,10 @@ test "memfd_create" {
else => return error.SkipZigTest,
}
const file: Io.File = .{ .handle = try posix.memfd_create("test", 0) };
const file: Io.File = .{
.handle = try posix.memfd_create("test", 0),
.flags = .{ .nonblocking = false },
};
defer file.close(io);
try file.writePositionalAll(io, "test", 0);
+37 -15
View File
@@ -453,14 +453,16 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child {
return io.vtable.processSpawnPath(io.userdata, dir, options);
}
pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{
StdoutStreamTooLong,
StderrStreamTooLong,
};
pub const RunError = error{
StreamTooLong,
} || SpawnError || Io.File.MultiReader.UnendingError || Io.Timeout.Error;
pub const RunOptions = struct {
argv: []const []const u8,
max_output_bytes: usize = 50 * 1024,
stderr_limit: Io.Limit = .unlimited,
stdout_limit: Io.Limit = .unlimited,
/// How many bytes to initially allocate for stderr and stdout.
reserve_amount: usize = 64,
/// Set to change the current working directory when spawning the child process.
cwd: ?[]const u8 = null,
@@ -486,6 +488,7 @@ pub const RunOptions = struct {
create_no_window: bool = true,
/// Darwin-only. Disable ASLR for the child process.
disable_aslr: bool = false,
timeout: Io.Timeout = .none,
};
pub const RunResult = struct {
@@ -513,22 +516,41 @@ pub fn run(gpa: Allocator, io: Io, options: RunOptions) RunError!RunResult {
});
defer child.kill(io);
var stdout: std.ArrayList(u8) = .empty;
defer stdout.deinit(gpa);
var stderr: std.ArrayList(u8) = .empty;
defer stderr.deinit(gpa);
var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined;
var multi_reader: Io.File.MultiReader = undefined;
multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? });
defer multi_reader.deinit();
try child.collectOutput(gpa, &stdout, &stderr, options.max_output_bytes);
const stdout_reader = multi_reader.reader(0);
const stderr_reader = multi_reader.reader(1);
while (multi_reader.fill(options.reserve_amount, options.timeout)) |_| {
if (options.stdout_limit.toInt()) |limit| {
if (stdout_reader.buffered().len > limit)
return error.StreamTooLong;
}
if (options.stderr_limit.toInt()) |limit| {
if (stderr_reader.buffered().len > limit)
return error.StreamTooLong;
}
} else |err| switch (err) {
error.EndOfStream => {},
else => |e| return e,
}
try multi_reader.checkAnyError();
const term = try child.wait(io);
const owned_stdout = try stdout.toOwnedSlice(gpa);
errdefer gpa.free(owned_stdout);
const owned_stderr = try stderr.toOwnedSlice(gpa);
const stdout_slice = try multi_reader.toOwnedSlice(0);
errdefer gpa.free(stdout_slice);
const stderr_slice = try multi_reader.toOwnedSlice(1);
errdefer gpa.free(stderr_slice);
return .{
.stdout = owned_stdout,
.stderr = owned_stderr,
.stdout = stdout_slice,
.stderr = stderr_slice,
.term = term,
};
}
-52
View File
@@ -9,7 +9,6 @@ const process = std.process;
const File = std.Io.File;
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
pub const Id = switch (native_os) {
.windows => std.os.windows.HANDLE,
@@ -125,54 +124,3 @@ pub fn wait(child: *Child, io: Io) WaitError!Term {
assert(child.id != null);
return io.vtable.childWait(io.userdata, child);
}
/// Collect the output from the process's stdout and stderr. Will return once all output
/// has been collected. This does not mean that the process has ended. `wait` should still
/// be called to wait for and clean up the process.
///
/// The process must have been started with stdout and stderr set to
/// `process.SpawnOptions.StdIo.pipe`.
pub fn collectOutput(
child: *const Child,
/// Used for `stdout` and `stderr`.
allocator: Allocator,
stdout: *ArrayList(u8),
stderr: *ArrayList(u8),
max_output_bytes: usize,
) !void {
var poller = std.Io.poll(allocator, enum { stdout, stderr }, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
defer poller.deinit();
const stdout_r = poller.reader(.stdout);
stdout_r.buffer = stdout.allocatedSlice();
stdout_r.seek = 0;
stdout_r.end = stdout.items.len;
const stderr_r = poller.reader(.stderr);
stderr_r.buffer = stderr.allocatedSlice();
stderr_r.seek = 0;
stderr_r.end = stderr.items.len;
defer {
stdout.* = .{
.items = stdout_r.buffer[0..stdout_r.end],
.capacity = stdout_r.buffer.len,
};
stderr.* = .{
.items = stderr_r.buffer[0..stderr_r.end],
.capacity = stderr_r.buffer.len,
};
stdout_r.buffer = &.{};
stderr_r.buffer = &.{};
}
while (try poller.poll()) {
if (stdout_r.bufferedLen() > max_output_bytes)
return error.StdoutStreamTooLong;
if (stderr_r.bufferedLen() > max_output_bytes)
return error.StderrStreamTooLong;
}
}
+4 -1
View File
@@ -29,7 +29,10 @@ pub fn get(p: *const Preopens, name: []const u8) ?Resource {
switch (native_os) {
.wasi => {
const index = p.map.getIndex(name) orelse return null;
if (index <= 2) return .{ .file = .{ .handle = @intCast(index) } };
if (index <= 2) return .{ .file = .{
.handle = @intCast(index),
.flags = .{ .nonblocking = false },
} };
return .{ .dir = .{ .handle = @intCast(index) } };
},
else => {
+4 -2
View File
@@ -268,7 +268,8 @@ fn findNativeIncludeDirPosix(self: *LibCInstallation, gpa: Allocator, io: Io, ar
});
const run_res = std.process.run(gpa, io, .{
.max_output_bytes = 1024 * 1024,
.stdout_limit = .limited(1024 * 1024),
.stderr_limit = .limited(1024 * 1024),
.argv = argv.items,
.environ_map = &environ_map,
// Some C compilers, such as Clang, are known to rely on argv[0] to find the path
@@ -584,7 +585,8 @@ fn ccPrintFileName(gpa: Allocator, io: Io, args: CCPrintFileNameOptions) ![]u8 {
try argv.append(arg1);
const run_res = std.process.run(gpa, io, .{
.max_output_bytes = 1024 * 1024,
.stdout_limit = .limited(1024 * 1024),
.stderr_limit = .limited(1024 * 1024),
.argv = argv.items,
.environ_map = &environ_map,
// Some C compilers, such as Clang, are known to rely on argv[0] to find the path
-1
View File
@@ -420,7 +420,6 @@ pub fn resolveTargetQuery(io: Io, query: Target.Query) DetectError!Target {
error.Canceled => |e| return e,
error.Unexpected => |e| return e,
error.WouldBlock => return error.Unexpected,
error.BrokenPipe => return error.Unexpected,
error.ConnectionResetByPeer => return error.Unexpected,
error.NotOpenForReading => return error.Unexpected,
error.SocketUnconnected => return error.Unexpected,
+33 -18
View File
@@ -6873,6 +6873,7 @@ fn spawnZigRc(
child_progress_node: std.Progress.Node,
) !void {
const io = comp.io;
const gpa = comp.gpa;
var node_name: std.ArrayList(u8) = .empty;
defer node_name.deinit(arena);
@@ -6887,55 +6888,69 @@ fn spawnZigRc(
});
defer child.kill(io);
var poller = std.Io.poll(comp.gpa, enum { stdout, stderr }, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
defer poller.deinit();
var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined;
var multi_reader: Io.File.MultiReader = undefined;
multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? });
defer multi_reader.deinit();
const stdout = poller.reader(.stdout);
const stdout = multi_reader.fileReader(0);
const MessageHeader = std.zig.Server.Message.Header;
poll: while (true) {
const MessageHeader = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(MessageHeader)) if (!try poller.poll()) break :poll;
const header = stdout.takeStruct(MessageHeader, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll;
const body = stdout.take(header.bytes_len) catch unreachable;
var eos_err: error{EndOfStream}!void = {};
while (true) {
const header = stdout.interface.takeStruct(MessageHeader, .little) catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => return stdout.err.?,
};
const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) {
error.EndOfStream => |e| {
// Better to report the crash with stderr below, but we set
// this in case the child exits successfully while violating
// this protocol.
eos_err = e;
break;
},
error.ReadFailed => return stdout.err.?,
};
switch (header.tag) {
// We expect exactly one ErrorBundle, and if any error_bundle header is
// sent then it's a fatal error.
.error_bundle => {
const error_bundle = try std.zig.Server.allocErrorBundle(comp.gpa, body);
const error_bundle = try std.zig.Server.allocErrorBundle(gpa, body);
return comp.failWin32ResourceWithOwnedBundle(win32_resource, error_bundle);
},
else => {}, // ignore other messages
}
}
// Just in case there's a failure that didn't send an ErrorBundle (e.g. an error return trace)
const stderr = poller.reader(.stderr);
try multi_reader.fillRemaining(.none);
// Just in case there's a failure that didn't send an ErrorBundle (e.g. an error return trace)
const term = child.wait(io) catch |err| {
return comp.failWin32Resource(win32_resource, "unable to wait for {s} rc: {t}", .{ argv[0], err });
};
const stderr = multi_reader.reader(1).buffered();
switch (term) {
.exited => |code| {
if (code != 0) {
log.err("zig rc failed with stderr:\n{s}", .{stderr.buffered()});
log.err("zig rc failed with stderr:\n{s}", .{stderr});
return comp.failWin32Resource(win32_resource, "zig rc exited with code {d}", .{code});
}
},
.signal => |sig| {
log.err("zig rc signaled {t} with stderr:\n{s}", .{ sig, stderr.buffered() });
log.err("zig rc signaled {t} with stderr:\n{s}", .{ sig, stderr });
return comp.failWin32Resource(win32_resource, "zig rc terminated unexpectedly", .{});
},
else => {
log.err("zig rc terminated with stderr:\n{s}", .{stderr.buffered()});
log.err("zig rc terminated with stderr:\n{s}", .{stderr});
return comp.failWin32Resource(win32_resource, "zig rc terminated unexpectedly", .{});
},
}
try eos_err;
}
pub fn tmpFilePath(comp: Compilation, ally: Allocator, suffix: []const u8) error{OutOfMemory}![]const u8 {
+2 -2
View File
@@ -2389,7 +2389,7 @@ pub const Pool = struct {
.nonstring = elem_ctype.isAnyChar() and switch (ptr_info.sentinel) {
.none => true,
.zero_u8 => false,
else => |sentinel| Value.fromInterned(sentinel).orderAgainstZero(zcu).compare(.neq),
else => |sentinel| !Value.fromInterned(sentinel).compareAllWithZero(.eq, zcu),
},
});
},
@@ -2438,7 +2438,7 @@ pub const Pool = struct {
.nonstring = elem_ctype.isAnyChar() and switch (array_info.sentinel) {
.none => true,
.zero_u8 => false,
else => |sentinel| Value.fromInterned(sentinel).orderAgainstZero(zcu).compare(.neq),
else => |sentinel| !Value.fromInterned(sentinel).compareAllWithZero(.eq, zcu),
},
});
if (!kind.isParameter()) return array_ctype;
+9 -2
View File
@@ -605,8 +605,8 @@ pub const File = struct {
switch (base.tag) {
.lld => assert(base.file == null),
.elf, .macho, .wasm => {
if (base.file != null) return;
dev.checkAny(&.{ .coff_linker, .elf_linker, .macho_linker, .plan9_linker, .wasm_linker });
if (base.file != null) return;
const emit = base.emit;
if (base.child_pid) |pid| {
if (builtin.os.tag == .windows) {
@@ -645,6 +645,7 @@ pub const File = struct {
base.file = try emit.root_dir.handle.openFile(io, emit.sub_path, .{ .mode = .read_write });
},
.elf2, .coff2 => if (base.file == null) {
dev.checkAny(&.{ .elf2_linker, .coff2_linker });
const mf = if (base.cast(.elf2)) |elf|
&elf.mf
else if (base.cast(.coff2)) |coff|
@@ -657,7 +658,13 @@ pub const File = struct {
base.file = mf.memory_map.file;
try mf.ensureTotalCapacity(@intCast(mf.nodes.items[0].location().resolve(mf)[1]));
},
.c, .spirv => dev.checkAny(&.{ .c_linker, .spirv_linker }),
.c => if (base.file == null) {
dev.check(.c_linker);
base.file = try base.emit.root_dir.handle.openFile(io, base.emit.sub_path, .{
.mode = .write_only,
});
},
.spirv => dev.check(.spirv_linker),
.plan9 => unreachable,
}
}
-6
View File
@@ -201,7 +201,6 @@ fn printOutput(
.argv = build_args.items,
.cwd = tmp_dir_path,
.environ_map = environ_map,
.max_output_bytes = max_doc_file_size,
});
switch (result.term) {
.exited => |exit_code| {
@@ -257,7 +256,6 @@ fn printOutput(
.argv = run_args,
.environ_map = environ_map,
.cwd = tmp_dir_path,
.max_output_bytes = max_doc_file_size,
});
switch (result.term) {
.exited => |exit_code| {
@@ -376,7 +374,6 @@ fn printOutput(
.argv = test_args.items,
.environ_map = environ_map,
.cwd = tmp_dir_path,
.max_output_bytes = max_doc_file_size,
});
switch (result.term) {
.exited => |exit_code| {
@@ -432,7 +429,6 @@ fn printOutput(
.argv = test_args.items,
.environ_map = environ_map,
.cwd = tmp_dir_path,
.max_output_bytes = max_doc_file_size,
});
switch (result.term) {
.exited => |exit_code| {
@@ -508,7 +504,6 @@ fn printOutput(
.argv = build_args.items,
.environ_map = environ_map,
.cwd = tmp_dir_path,
.max_output_bytes = max_doc_file_size,
});
switch (result.term) {
.exited => |exit_code| {
@@ -1132,7 +1127,6 @@ fn run(
.argv = args,
.environ_map = environ_map,
.cwd = cwd,
.max_output_bytes = max_doc_file_size,
});
switch (result.term) {
.exited => |exit_code| {
+44 -36
View File
@@ -28,6 +28,7 @@ fn logImpl(
}
pub fn main(init: std.process.Init) !void {
const gpa = init.gpa;
const fatal = std.process.fatal;
const arena = init.arena.allocator();
const io = init.io;
@@ -224,11 +225,10 @@ pub fn main(init: std.process.Init) !void {
.enable_darling = enable_darling,
};
var poller = Io.poll(arena, Eval.StreamEnum, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
defer poller.deinit();
var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined;
var multi_reader: Io.File.MultiReader = undefined;
multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? });
defer multi_reader.deinit();
for (case.updates) |update| {
var update_node = target_prog_node.start(update.name, 0);
@@ -243,10 +243,10 @@ pub fn main(init: std.process.Init) !void {
eval.write(update);
try eval.requestUpdate();
try eval.check(&poller, update, update_node);
try eval.check(&multi_reader, update, update_node);
}
try eval.end(&poller);
try eval.end(&multi_reader);
waitChild(&child, &eval);
}
@@ -272,9 +272,6 @@ const Eval = struct {
enable_wasmtime: bool,
enable_darling: bool,
const StreamEnum = enum { stdout, stderr };
const Poller = Io.Poller(StreamEnum);
/// Currently this function assumes the previous updates have already been written.
fn write(eval: *Eval, update: Case.Update) void {
const io = eval.io;
@@ -293,23 +290,29 @@ const Eval = struct {
}
}
fn check(eval: *Eval, poller: *Poller, update: Case.Update, prog_node: std.Progress.Node) !void {
fn check(eval: *Eval, mr: *Io.File.MultiReader, update: Case.Update, prog_node: std.Progress.Node) !void {
const arena = eval.arena;
const stdout = poller.reader(.stdout);
const stderr = poller.reader(.stderr);
const stdout = mr.fileReader(0);
const stderr = &mr.fileReader(1).interface;
const Header = std.zig.Server.Message.Header;
poll: while (true) {
const Header = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll;
const body = stdout.take(header.bytes_len) catch unreachable;
while (true) {
const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => return stdout.err.?,
};
const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) {
// If this panic triggers it might be helpful to rework this
// code to print the stderr from the abnormally terminated child.
error.EndOfStream => @panic("unexpected mid-message end of stream"),
error.ReadFailed => return stdout.err.?,
};
switch (header.tag) {
.error_bundle => {
const result_error_bundle = try std.zig.Server.allocErrorBundle(arena, body);
if (stderr.bufferedLen() > 0) {
const stderr_data = try poller.toOwnedSlice(.stderr);
const stderr_data = try mr.toOwnedSlice(1);
if (eval.allow_stderr) {
std.log.info("error_bundle stderr:\n{s}", .{stderr_data});
} else {
@@ -326,7 +329,7 @@ const Eval = struct {
var r: std.Io.Reader = .fixed(body);
_ = r.takeStruct(std.zig.Server.Message.EmitDigest, .little) catch unreachable;
if (stderr.bufferedLen() > 0) {
const stderr_data = try poller.toOwnedSlice(.stderr);
const stderr_data = try mr.toOwnedSlice(1);
if (eval.allow_stderr) {
std.log.info("emit_digest stderr:\n{s}", .{stderr_data});
} else {
@@ -358,11 +361,12 @@ const Eval = struct {
}
}
if (stderr.bufferedLen() > 0) {
const buffered_stderr = stderr.buffered();
if (buffered_stderr.len > 0) {
if (eval.allow_stderr) {
std.log.info("stderr:\n{s}", .{stderr.buffered()});
std.log.info("stderr:\n{s}", .{buffered_stderr});
} else {
eval.fatal("unexpected stderr:\n{s}", .{stderr.buffered()});
eval.fatal("unexpected stderr:\n{s}", .{buffered_stderr});
}
}
@@ -588,23 +592,27 @@ const Eval = struct {
};
}
fn end(eval: *Eval, poller: *Poller) !void {
fn end(eval: *Eval, mr: *Io.File.MultiReader) !void {
requestExit(eval.child, eval);
const stdout = poller.reader(.stdout);
const stderr = poller.reader(.stderr);
const stdout = mr.fileReader(0);
const Header = std.zig.Server.Message.Header;
poll: while (true) {
const Header = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll;
stdout.toss(header.bytes_len);
while (true) {
const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => return stdout.err.?,
};
stdout.interface.discardAll(header.bytes_len) catch |err| switch (err) {
error.ReadFailed => return stdout.err.?,
error.EndOfStream => |e| return e,
};
}
if (stderr.bufferedLen() > 0) {
eval.fatal("unexpected stderr:\n{s}", .{stderr.buffered()});
}
try mr.fillRemaining(.none);
const stderr = mr.reader(1).buffered();
if (stderr.len > 0) eval.fatal("unexpected stderr:\n{s}", .{stderr});
}
fn buildCOutput(eval: *Eval, c_path: []const u8, out_path: []const u8, prog_node: std.Progress.Node) !void {
-1
View File
@@ -676,7 +676,6 @@ pub fn main(init: std.process.Init) !void {
const child_result = try std.process.run(arena, io, .{
.argv = &child_args,
.max_output_bytes = 100 * 1024 * 1024,
});
std.debug.print("{s}\n", .{child_result.stderr});
-1
View File
@@ -1987,7 +1987,6 @@ fn processOneTarget(io: Io, job: Job) void {
const child_result = try std.process.run(arena, io, .{
.argv = &child_args,
.max_output_bytes = 500 * 1024 * 1024,
});
tblgen_progress.end();
if (child_result.stderr.len != 0) {