diff --git a/build.zig b/build.zig index 9835714efd..84cbba38bd 100644 --- a/build.zig +++ b/build.zig @@ -29,7 +29,7 @@ pub fn build(b: *std.Build) !void { const use_zig_libcxx = b.option(bool, "use-zig-libcxx", "If libc++ is needed, use zig's bundled version, don't try to integrate with the system") orelse false; const test_step = b.step("test", "Run all the tests"); - const skip_install_lib_files = b.option(bool, "no-lib", "skip copying of lib/ files and langref to installation prefix. Useful for development") orelse false; + const skip_install_lib_files = b.option(bool, "no-lib", "skip copying of lib/ files and langref to installation prefix. Useful for development") orelse only_c; const skip_install_langref = b.option(bool, "no-langref", "skip copying of langref to the installation prefix") orelse skip_install_lib_files; const std_docs = b.option(bool, "std-docs", "include standard library autodocs") orelse false; const no_bin = b.option(bool, "no-bin", "skip emitting compiler binary") orelse false; @@ -472,27 +472,7 @@ pub fn build(b: *std.Build) !void { .skip_linux = skip_linux, .skip_llvm = skip_llvm, .skip_libc = skip_libc, - .max_rss = switch (b.graph.host.result.os.tag) { - .freebsd => 2_000_000_000, - .linux => switch (b.graph.host.result.cpu.arch) { - .aarch64 => 659_809_075, - .loongarch64 => 598_902_374, - .powerpc64le => 627_431_833, - .riscv64 => 827_043_430, - .s390x => 580_596_121, - .x86_64 => 3_290_894_745, - else => 3_300_000_000, - }, - .macos => switch (b.graph.host.result.cpu.arch) { - .aarch64 => 767_736_217, - else => 800_000_000, - }, - .windows => switch (b.graph.host.result.cpu.arch) { - .x86_64 => 603_070_054, - else => 700_000_000, - }, - else => 3_300_000_000, - }, + .max_rss = 3_300_000_000, })); test_modules_step.dependOn(tests.addModuleTests(b, .{ @@ -518,23 +498,7 @@ pub fn build(b: *std.Build) !void { .skip_llvm = skip_llvm, .skip_libc = true, .no_builtin = true, - .max_rss = switch (b.graph.host.result.os.tag) { - .freebsd => 800_000_000, - .linux => switch (b.graph.host.result.cpu.arch) { - .aarch64 => 639_565_414, - .loongarch64 => 598_884_352, - .powerpc64le => 597_897_625, - .riscv64 => 636_429_516, - .s390x => 574_166_630, - .x86_64 => 978_463_129, - else => 900_000_000, - }, - .macos => switch (b.graph.host.result.cpu.arch) { - .aarch64 => 701_413_785, - else => 800_000_000, - }, - else => 900_000_000, - }, + .max_rss = 900_000_000, })); test_modules_step.dependOn(tests.addModuleTests(b, .{ diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index 24e00bea5e..cfc263b770 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -381,10 +381,17 @@ pub fn addError(step: *Step, comptime fmt: []const u8, args: anytype) error{OutO pub const ZigProcess = struct { child: std.process.Child, - poller: Io.Poller(StreamEnum), + multi_reader_buffer: Io.File.MultiReader.Buffer(2), + multi_reader: Io.File.MultiReader, progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void, pub const StreamEnum = enum { stdout, stderr }; + + pub fn deinit(zp: *ZigProcess, io: Io) void { + zp.child.kill(io); + zp.multi_reader.deinit(); + zp.* = undefined; + } }; /// Assumes that argv contains `--listen=-` and that the process being spawned @@ -409,7 +416,8 @@ pub fn evalZigProcess( assert(watch); if (std.Progress.have_ipc) if (zp.progress_ipc_fd) |fd| prog_node.setIpcFd(fd); const result = zigProcessUpdate(s, zp, watch, web_server, gpa) catch |err| switch (err) { - error.BrokenPipe => { + error.BrokenPipe, error.EndOfStream => |reason| { + std.log.info("{s} restart required: {t}", .{ argv[0], reason }); // Process restart required. const term = zp.child.wait(io) catch |e| { return s.fail("unable to wait for {s}: {t}", .{ argv[0], e }); @@ -455,18 +463,18 @@ pub fn evalZigProcess( .request_resource_usage_statistics = true, .progress_node = prog_node, }) catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err }); - defer if (!watch) zp.child.kill(io); zp.* = .{ .child = zp.child, - .poller = Io.poll(gpa, ZigProcess.StreamEnum, .{ - .stdout = zp.child.stdout.?, - .stderr = zp.child.stderr.?, - }), + .multi_reader_buffer = undefined, + .multi_reader = undefined, .progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {}, }; + zp.multi_reader.init(gpa, io, zp.multi_reader_buffer.toStreams(), &.{ + zp.child.stdout.?, zp.child.stderr.?, + }); if (watch) s.setZigProcess(zp); - defer if (!watch) zp.poller.deinit(); + defer if (!watch) zp.deinit(io); const result = try zigProcessUpdate(s, zp, watch, web_server, gpa); @@ -532,15 +540,26 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build. if (!watch) try sendMessage(io, zp.child.stdin.?, .exit); var result: ?Path = null; + var eos_err: error{EndOfStream}!void = {}; - const stdout = zp.poller.reader(.stdout); + const stdout = zp.multi_reader.fileReader(0); - poll: while (true) { + while (true) { const Header = std.zig.Server.Message.Header; - while (stdout.buffered().len < @sizeOf(Header)) if (!try zp.poller.poll()) break :poll; - const header = stdout.takeStruct(Header, .little) catch unreachable; - while (stdout.buffered().len < header.bytes_len) if (!try zp.poller.poll()) break :poll; - const body = stdout.take(header.bytes_len) catch unreachable; + const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => return stdout.err.?, + }; + const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) { + error.EndOfStream => |e| { + // Better to report the crash with stderr below, but we set + // this in case the child exits successfully while violating + // this protocol. + eos_err = e; + break; + }, + error.ReadFailed => return stdout.err.?, + }; switch (header.tag) { .zig_version => { if (!std.mem.eql(u8, builtin.zig_version_string, body)) { @@ -553,11 +572,11 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build. .error_bundle => { s.result_error_bundle = try std.zig.Server.allocErrorBundle(gpa, body); // This message indicates the end of the update. - if (watch) break :poll; + if (watch) break; }, .emit_digest => { const EmitDigest = std.zig.Server.Message.EmitDigest; - const emit_digest = @as(*align(1) const EmitDigest, @ptrCast(body)); + const emit_digest: *align(1) const EmitDigest = @ptrCast(body); s.result_cached = emit_digest.flags.cache_hit; const digest = body[@sizeOf(EmitDigest)..][0..Cache.bin_digest_len]; result = .{ @@ -631,11 +650,13 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build. s.result_duration_ns = timer.read(); - const stderr_contents = try zp.poller.toOwnedSlice(.stderr); + const stderr_contents = zp.multi_reader.reader(1).buffered(); if (stderr_contents.len > 0) { try s.result_error_msgs.append(arena, try arena.dupe(u8, stderr_contents)); } + try eos_err; + return result; } diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index d025e01af4..68d0ec480c 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1385,14 +1385,12 @@ fn runCommand( break :term spawnChildAndCollect(run, interp_argv.items, &environ_map, has_side_effects, options, fuzz_context) catch |e| { if (!run.failing_to_execute_foreign_is_an_error) return error.MakeSkipped; if (e == error.MakeFailed) return error.MakeFailed; // error already reported - return step.fail("unable to spawn interpreter {s}: {s}", .{ - interp_argv.items[0], @errorName(e), - }); + return step.fail("unable to spawn interpreter {s}: {t}", .{ interp_argv.items[0], e }); }; } if (err == error.MakeFailed) return error.MakeFailed; // error already reported - return step.fail("failed to spawn and capture stdio from {s}: {s}", .{ argv[0], @errorName(err) }); + return step.fail("failed to spawn and capture stdio from {s}: {t}", .{ argv[0], err }); }; const generic_result = opt_generic_result orelse { @@ -1589,9 +1587,13 @@ fn spawnChildAndCollect( }; if (run.stdio == .zig_test) { - var timer = try std.time.Timer.start(); - defer run.step.result_duration_ns = timer.read(); - try evalZigTest(run, spawn_options, options, fuzz_context); + const started: Io.Clock.Timestamp = try .now(io, .awake); + const result = evalZigTest(run, spawn_options, options, fuzz_context) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| e, + }; + run.step.result_duration_ns = @intCast((try started.untilNow(io)).raw.nanoseconds); + try result; return null; } else { const inherit = spawn_options.stdout == .inherit or spawn_options.stderr == .inherit; @@ -1604,10 +1606,14 @@ fn spawnChildAndCollect( } else .no_color; defer if (inherit) io.unlockStderr(); try setColorEnvironmentVariables(run, environ_map, terminal_mode); - var timer = try std.time.Timer.start(); - const res = try evalGeneric(run, spawn_options); - run.step.result_duration_ns = timer.read(); - return .{ .term = res.term, .stdout = res.stdout, .stderr = res.stderr }; + + const started: Io.Clock.Timestamp = try .now(io, .awake); + const result = evalGeneric(run, spawn_options) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| e, + }; + run.step.result_duration_ns = @intCast((try started.untilNow(io)).raw.nanoseconds); + return try result; } } @@ -1669,39 +1675,42 @@ fn evalZigTest( while (true) { var child = try process.spawn(io, spawn_options); - var poller = std.Io.poll(gpa, StdioPollEnum, .{ - .stdout = child.stdout.?, - .stderr = child.stderr.?, - }); + var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined; + var multi_reader: Io.File.MultiReader = undefined; + multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? }); var child_killed = false; defer if (!child_killed) { child.kill(io); - poller.deinit(); + multi_reader.deinit(); run.step.result_peak_rss = @max( run.step.result_peak_rss, child.resource_usage_statistics.getMaxRss() orelse 0, ); }; - switch (try pollZigTest( + switch (try waitZigTest( run, &child, options, fuzz_context, - &poller, + &multi_reader, &test_metadata, &test_results, )) { .write_failed => |err| { // The runner unexpectedly closed a stdio pipe, which means a crash. Make sure we've captured // all available stderr to make our error output as useful as possible. - while (try poller.poll()) {} - run.step.result_stderr = try arena.dupe(u8, poller.reader(.stderr).buffered()); + const stderr_fr = multi_reader.fileReader(1); + while (stderr_fr.interface.fillMore()) |_| {} else |e| switch (e) { + error.ReadFailed => return stderr_fr.err.?, + error.EndOfStream => {}, + } + run.step.result_stderr = try arena.dupe(u8, stderr_fr.interface.buffered()); // Clean up everything and wait for the child to exit. child.stdin.?.close(io); child.stdin = null; - poller.deinit(); + multi_reader.deinit(); child_killed = true; const term = try child.wait(io); run.step.result_peak_rss = @max( @@ -1716,13 +1725,13 @@ fn evalZigTest( .no_poll => |no_poll| { // This might be a success (we requested exit and the child dutifully closed stdout) or // a crash of some kind. Either way, the child will terminate by itself -- wait for it. - const stderr_owned = try arena.dupe(u8, poller.reader(.stderr).buffered()); - poller.reader(.stderr).tossBuffered(); + const stderr_reader = multi_reader.reader(1); + const stderr_owned = try arena.dupe(u8, stderr_reader.buffered()); // Clean up everything and wait for the child to exit. child.stdin.?.close(io); child.stdin = null; - poller.deinit(); + multi_reader.deinit(); child_killed = true; const term = try child.wait(io); run.step.result_peak_rss = @max( @@ -1770,8 +1779,9 @@ fn evalZigTest( return; }, .timeout => |timeout| { - const stderr = poller.reader(.stderr).buffered(); - poller.reader(.stderr).tossBuffered(); + const stderr_reader = multi_reader.reader(1); + const stderr = stderr_reader.buffered(); + stderr_reader.tossBuffered(); if (timeout.active_test_index) |test_index| { // A test was running. Report the timeout against that test, and continue on to // the next test. @@ -1796,16 +1806,16 @@ fn evalZigTest( } } -/// Polls stdout of a Zig test process until a termination condition is reached: +/// Reads stdout of a Zig test process until a termination condition is reached: /// * A write fails, indicating the child unexpectedly closed stdin /// * A test (or a response from the test runner) times out -/// * `poll` fails, indicating the child closed stdout and stderr -fn pollZigTest( +/// * The wait fails, indicating the child closed stdout and stderr +fn waitZigTest( run: *Run, child: *process.Child, options: Step.MakeOptions, fuzz_context: ?FuzzContext, - poller: *std.Io.Poller(StdioPollEnum), + multi_reader: *Io.File.MultiReader, opt_metadata: *?TestMetadata, results: *Step.TestResults, ) !union(enum) { @@ -1859,9 +1869,7 @@ fn pollZigTest( var active_test_index: ?u32 = null; - // `null` means this host does not support `std.time.Timer`. This timer is `reset()` whenever we - // change `active_test_index`, i.e. whenever a test starts or finishes. - var timer: ?std.time.Timer = std.time.Timer.start() catch null; + var last_update: Io.Clock.Timestamp = try .now(io, .awake); var coverage_id: ?u64 = null; @@ -1869,16 +1877,26 @@ fn pollZigTest( // test. For instance, if the test runner leaves this much time between us requesting a test to // start and it acknowledging the test starting, we terminate the child and raise an error. This // *should* never happen, but could in theory be caused by some very unlucky IB in a test. - const response_timeout_ns: ?u64 = ns: { - if (fuzz_context != null) break :ns null; // don't timeout fuzz tests - break :ns @max(options.unit_test_timeout_ns orelse 0, 60 * std.time.ns_per_s); + const response_timeout: ?Io.Clock.Duration = t: { + if (fuzz_context != null) break :t null; // don't timeout fuzz tests + const ns = @max(options.unit_test_timeout_ns orelse 0, 60 * std.time.ns_per_s); + break :t .{ .clock = .awake, .raw = .fromNanoseconds(ns) }; }; + const test_timeout: ?Io.Clock.Duration = if (options.unit_test_timeout_ns) |ns| .{ + .clock = .awake, + .raw = .fromNanoseconds(ns), + } else null; - const stdout = poller.reader(.stdout); - const stderr = poller.reader(.stderr); + const stdout = multi_reader.reader(0); + const stderr = multi_reader.reader(1); + const Header = std.zig.Server.Message.Header; while (true) { - const Header = std.zig.Server.Message.Header; + const timeout: Io.Timeout = t: { + const opt_duration = if (active_test_index == null) response_timeout else test_timeout; + const duration = opt_duration orelse break :t .none; + break :t .{ .deadline = last_update.addDuration(duration) }; + }; // This block is exited when `stdout` contains enough bytes for a `Header`. header_ready: { @@ -1887,47 +1905,37 @@ fn pollZigTest( break :header_ready; } - // Always `null` if `timer` is `null`. - const opt_timeout_ns: ?u64 = ns: { - if (timer == null) break :ns null; - if (active_test_index == null) break :ns response_timeout_ns; - break :ns options.unit_test_timeout_ns; + multi_reader.fill(64, timeout) catch |err| switch (err) { + error.Timeout => return .{ .timeout = .{ + .active_test_index = active_test_index, + .ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds), + } }, + error.EndOfStream => return .{ .no_poll = .{ + .active_test_index = active_test_index, + .ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds), + } }, + else => |e| return e, }; - if (opt_timeout_ns) |timeout_ns| { - const remaining_ns = timeout_ns -| timer.?.read(); - if (!try poller.pollTimeout(remaining_ns)) return .{ .no_poll = .{ - .active_test_index = active_test_index, - .ns_elapsed = if (timer) |*t| t.read() else 0, - } }; - } else { - if (!try poller.poll()) return .{ .no_poll = .{ - .active_test_index = active_test_index, - .ns_elapsed = if (timer) |*t| t.read() else 0, - } }; - } - - if (stdout.buffered().len >= @sizeOf(Header)) { - // There wasn't a header before, but there is one after the `poll`. - break :header_ready; - } - - if (opt_timeout_ns) |timeout_ns| { - const cur_ns = timer.?.read(); - if (cur_ns >= timeout_ns) return .{ .timeout = .{ - .active_test_index = active_test_index, - .ns_elapsed = cur_ns, - } }; - } continue; } // There is definitely a header available now -- read it. const header = stdout.takeStruct(Header, .little) catch unreachable; - while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) return .{ .no_poll = .{ - .active_test_index = active_test_index, - .ns_elapsed = if (timer) |*t| t.read() else 0, - } }; + while (stdout.buffered().len < header.bytes_len) { + multi_reader.fill(64, timeout) catch |err| switch (err) { + error.Timeout => return .{ .timeout = .{ + .active_test_index = active_test_index, + .ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds), + } }, + error.EndOfStream => return .{ .no_poll = .{ + .active_test_index = active_test_index, + .ns_elapsed = @intCast((try last_update.untilNow(io)).raw.nanoseconds), + } }, + else => |e| return e, + }; + } + const body = stdout.take(header.bytes_len) catch unreachable; var body_r: std.Io.Reader = .fixed(body); switch (header.tag) { @@ -1968,13 +1976,13 @@ fn pollZigTest( @memset(opt_metadata.*.?.ns_per_test, std.math.maxInt(u64)); active_test_index = null; - if (timer) |*t| t.reset(); + last_update = try .now(io, .awake); requestNextTest(io, child.stdin.?, &opt_metadata.*.?, &sub_prog_node) catch |err| return .{ .write_failed = err }; }, .test_started => { active_test_index = opt_metadata.*.?.next_index - 1; - if (timer) |*t| t.reset(); + last_update = try .now(io, .awake); }, .test_results => { assert(fuzz_context == null); @@ -2017,7 +2025,10 @@ fn pollZigTest( } active_test_index = null; - if (timer) |*t| md.ns_per_test[tr_hdr.index] = t.lap(); + + const now: Io.Clock.Timestamp = try .now(io, .awake); + md.ns_per_test[tr_hdr.index] = @intCast(last_update.durationTo(now).raw.nanoseconds); + last_update = now; requestNextTest(io, child.stdin.?, md, &sub_prog_node) catch |err| return .{ .write_failed = err }; }, @@ -2164,6 +2175,7 @@ fn evalGeneric(run: *Run, spawn_options: process.SpawnOptions) !EvalGenericResul const b = run.step.owner; const io = b.graph.io; const arena = b.allocator; + const gpa = b.allocator; var child = try process.spawn(io, spawn_options); defer child.kill(io); @@ -2211,23 +2223,31 @@ fn evalGeneric(run: *Run, spawn_options: process.SpawnOptions) !EvalGenericResul if (child.stdout) |stdout| { if (child.stderr) |stderr| { - var poller = std.Io.poll(arena, enum { stdout, stderr }, .{ - .stdout = stdout, - .stderr = stderr, - }); - defer poller.deinit(); + var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined; + var multi_reader: Io.File.MultiReader = undefined; + multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ stdout, stderr }); + defer multi_reader.deinit(); - while (try poller.poll()) { + const stdout_reader = multi_reader.reader(0); + const stderr_reader = multi_reader.reader(1); + + while (multi_reader.fill(64, .none)) |_| { if (run.stdio_limit.toInt()) |limit| { - if (poller.reader(.stderr).buffered().len > limit) + if (stdout_reader.buffered().len > limit) return error.StdoutStreamTooLong; - if (poller.reader(.stderr).buffered().len > limit) + if (stderr_reader.buffered().len > limit) return error.StderrStreamTooLong; } + } else |err| switch (err) { + error.UnsupportedClock, error.Timeout => unreachable, + error.EndOfStream => {}, + else => |e| return e, } - stdout_bytes = try poller.toOwnedSlice(.stdout); - stderr_bytes = try poller.toOwnedSlice(.stderr); + try multi_reader.checkAnyError(); + + stdout_bytes = try multi_reader.toOwnedSlice(0); + stderr_bytes = try multi_reader.toOwnedSlice(1); } else { var stdout_reader = stdout.readerStreaming(io, &.{}); stdout_bytes = stdout_reader.interface.allocRemaining(arena, run.stdio_limit) catch |err| switch (err) { diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig index e1536fb8fa..1f380b6c50 100644 --- a/lib/std/Build/WebServer.zig +++ b/lib/std/Build/WebServer.zig @@ -588,11 +588,12 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim }); defer child.kill(io); - var poller = Io.poll(gpa, enum { stdout, stderr }, .{ - .stdout = child.stdout.?, - .stderr = child.stderr.?, - }); - defer poller.deinit(); + var stderr_task = try io.concurrent(readStreamAlloc, .{ gpa, io, child.stderr.?, .unlimited }); + defer if (stderr_task.cancel(io)) |slice| gpa.free(slice) else |_| {}; + + var stdout_buffer: [512]u8 = undefined; + var stdout_reader: Io.File.Reader = .initStreaming(child.stdout.?, io, &stdout_buffer); + const stdout = &stdout_reader.interface; try child.stdin.?.writeStreamingAll(io, @ptrCast(@as([]const std.zig.Client.Message.Header, &.{ .{ .tag = .update, .bytes_len = 0 }, @@ -600,16 +601,17 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim }))); const Header = std.zig.Server.Message.Header; + var result: ?Cache.Path = null; var result_error_bundle = std.zig.ErrorBundle.empty; + var body_buffer: std.ArrayList(u8) = .empty; + defer body_buffer.deinit(gpa); - const stdout = poller.reader(.stdout); - - poll: while (true) { - while (stdout.buffered().len < @sizeOf(Header)) if (!(try poller.poll())) break :poll; - const header = stdout.takeStruct(Header, .little) catch unreachable; - while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; - const body = stdout.take(header.bytes_len) catch unreachable; + while (true) { + const header = try stdout.takeStruct(Header, .little); + body_buffer.clearRetainingCapacity(); + try stdout.appendExact(gpa, &body_buffer, header.bytes_len); + const body = body_buffer.items; switch (header.tag) { .zig_version => { @@ -636,7 +638,7 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim } } - const stderr_contents = try poller.toOwnedSlice(.stderr); + const stderr_contents = try stderr_task.await(io); if (stderr_contents.len > 0) { std.debug.print("{s}", .{stderr_contents}); } @@ -697,6 +699,14 @@ fn buildClientWasm(ws: *WebServer, arena: Allocator, optimize: std.builtin.Optim return base_path.join(arena, bin_name); } +fn readStreamAlloc(gpa: Allocator, io: Io, file: Io.File, limit: Io.Limit) ![]u8 { + var file_reader: Io.File.Reader = .initStreaming(file, io, &.{}); + return file_reader.interface.allocRemaining(gpa, limit) catch |err| switch (err) { + error.ReadFailed => return file_reader.err.?, + else => |e| return e, + }; +} + pub fn updateTimeReportCompile(ws: *WebServer, opts: struct { compile: *Build.Step.Compile, diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 6dc0e24731..c3ba3575e4 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -15,463 +15,13 @@ const Io = @This(); const builtin = @import("builtin"); -const is_windows = builtin.os.tag == .windows; const std = @import("std.zig"); -const windows = std.os.windows; -const posix = std.posix; const math = std.math; const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; -pub fn poll( - gpa: Allocator, - comptime StreamEnum: type, - files: PollFiles(StreamEnum), -) Poller(StreamEnum) { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - var result: Poller(StreamEnum) = .{ - .gpa = gpa, - .readers = @splat(.failing), - .poll_fds = undefined, - .windows = if (is_windows) .{ - .first_read_done = false, - .overlapped = [1]windows.OVERLAPPED{ - std.mem.zeroes(windows.OVERLAPPED), - } ** enum_fields.len, - .small_bufs = undefined, - .active = .{ - .count = 0, - .handles_buf = undefined, - .stream_map = undefined, - }, - } else {}, - }; - - inline for (enum_fields, 0..) |field, i| { - if (is_windows) { - result.windows.active.handles_buf[i] = @field(files, field.name).handle; - } else { - result.poll_fds[i] = .{ - .fd = @field(files, field.name).handle, - .events = posix.POLL.IN, - .revents = undefined, - }; - } - } - - return result; -} - -pub fn Poller(comptime StreamEnum: type) type { - return struct { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - const PollFd = if (is_windows) void else posix.pollfd; - - gpa: Allocator, - readers: [enum_fields.len]Reader, - poll_fds: [enum_fields.len]PollFd, - windows: if (is_windows) struct { - first_read_done: bool, - overlapped: [enum_fields.len]windows.OVERLAPPED, - small_bufs: [enum_fields.len][128]u8, - active: struct { - count: math.IntFittingRange(0, enum_fields.len), - handles_buf: [enum_fields.len]windows.HANDLE, - stream_map: [enum_fields.len]StreamEnum, - - pub fn removeAt(self: *@This(), index: u32) void { - assert(index < self.count); - for (index + 1..self.count) |i| { - self.handles_buf[i - 1] = self.handles_buf[i]; - self.stream_map[i - 1] = self.stream_map[i]; - } - self.count -= 1; - } - }, - } else void, - - const Self = @This(); - - pub fn deinit(self: *Self) void { - const gpa = self.gpa; - if (is_windows) { - // cancel any pending IO to prevent clobbering OVERLAPPED value - for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { - _ = windows.kernel32.CancelIo(h); - } - } - inline for (&self.readers) |*r| gpa.free(r.buffer); - self.* = undefined; - } - - pub fn poll(self: *Self) !bool { - if (is_windows) { - return pollWindows(self, null); - } else { - return pollPosix(self, null); - } - } - - pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool { - if (is_windows) { - return pollWindows(self, nanoseconds); - } else { - return pollPosix(self, nanoseconds); - } - } - - pub fn reader(self: *Self, which: StreamEnum) *Reader { - return &self.readers[@intFromEnum(which)]; - } - - pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 { - const gpa = self.gpa; - const r = reader(self, which); - if (r.seek == 0) { - const new = try gpa.realloc(r.buffer, r.end); - r.buffer = &.{}; - r.end = 0; - return new; - } - const new = try gpa.dupe(u8, r.buffered()); - gpa.free(r.buffer); - r.buffer = &.{}; - r.seek = 0; - r.end = 0; - return new; - } - - fn pollWindows(self: *Self, nanoseconds: ?u64) !bool { - const bump_amt = 512; - const gpa = self.gpa; - - if (!self.windows.first_read_done) { - var already_read_data = false; - for (0..enum_fields.len) |i| { - const handle = self.windows.active.handles_buf[i]; - switch (try windowsAsyncReadToFifoAndQueueSmallRead( - gpa, - handle, - &self.windows.overlapped[i], - &self.readers[i], - &self.windows.small_bufs[i], - bump_amt, - )) { - .populated, .empty => |state| { - if (state == .populated) already_read_data = true; - self.windows.active.handles_buf[self.windows.active.count] = handle; - self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i)); - self.windows.active.count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .closed_populated => { - // don't add to the wait_objects list, but we did already get data - already_read_data = true; - }, - } - } - self.windows.first_read_done = true; - if (already_read_data) return true; - } - - while (true) { - if (self.windows.active.count == 0) return false; - - const status = windows.kernel32.WaitForMultipleObjects( - self.windows.active.count, - &self.windows.active.handles_buf, - 0, - if (nanoseconds) |ns| - @min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1) - else - windows.INFINITE, - ); - if (status == windows.WAIT_FAILED) - return windows.unexpectedError(windows.GetLastError()); - if (status == windows.WAIT_TIMEOUT) - return true; - - if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1) - unreachable; - - const active_idx = status - windows.WAIT_OBJECT_0; - - const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]); - const handle = self.windows.active.handles_buf[active_idx]; - - const overlapped = &self.windows.overlapped[stream_idx]; - const stream_reader = &self.readers[stream_idx]; - const small_buf = &self.windows.small_bufs[stream_idx]; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => { - self.windows.active.removeAt(active_idx); - continue; - }, - .aborted => unreachable, - }; - const buf = small_buf[0..num_bytes_read]; - const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len); - @memcpy(dest[0..buf.len], buf); - advanceBufferEnd(stream_reader, buf.len); - - switch (try windowsAsyncReadToFifoAndQueueSmallRead( - gpa, - handle, - overlapped, - stream_reader, - small_buf, - bump_amt, - )) { - .empty => {}, // irrelevant, we already got data from the small buffer - .populated => {}, - .closed, - .closed_populated, // identical, since we already got data from the small buffer - => self.windows.active.removeAt(active_idx), - } - return true; - } - } - - fn pollPosix(self: *Self, nanoseconds: ?u64) !bool { - const gpa = self.gpa; - // We ask for ensureUnusedCapacity with this much extra space. This - // has more of an effect on small reads because once the reads - // start to get larger the amount of space an ArrayList will - // allocate grows exponentially. - const bump_amt = 512; - - const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP; - - const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns| - std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32) - else - -1); - if (events_len == 0) { - for (self.poll_fds) |poll_fd| { - if (poll_fd.fd != -1) return true; - } else return false; - } - - var keep_polling = false; - for (&self.poll_fds, &self.readers) |*poll_fd, *r| { - // Try reading whatever is available before checking the error - // conditions. - // It's still possible to read after a POLL.HUP is received, - // always check if there's some data waiting to be read first. - if (poll_fd.revents & posix.POLL.IN != 0) { - const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt); - const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) { - error.BrokenPipe => 0, // Handle the same as EOF. - else => |e| return e, - }; - advanceBufferEnd(r, amt); - if (amt == 0) { - // Remove the fd when the EOF condition is met. - poll_fd.fd = -1; - } else { - keep_polling = true; - } - } else if (poll_fd.revents & err_mask != 0) { - // Exclude the fds that signaled an error. - poll_fd.fd = -1; - } else if (poll_fd.fd != -1) { - keep_polling = true; - } - } - return keep_polling; - } - - /// Returns a slice into the unused capacity of `buffer` with at least - /// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary. - /// - /// After calling this function, typically the caller will follow up with a - /// call to `advanceBufferEnd` to report the actual number of bytes buffered. - fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 { - { - const unused = r.buffer[r.end..]; - if (unused.len >= min_len) return unused; - } - if (r.seek > 0) { - const data = r.buffer[r.seek..r.end]; - @memmove(r.buffer[0..data.len], data); - r.seek = 0; - r.end = data.len; - } - { - var list: std.ArrayList(u8) = .{ - .items = r.buffer[0..r.end], - .capacity = r.buffer.len, - }; - defer r.buffer = list.allocatedSlice(); - try list.ensureUnusedCapacity(allocator, min_len); - } - const unused = r.buffer[r.end..]; - assert(unused.len >= min_len); - return unused; - } - - /// After writing directly into the unused capacity of `buffer`, this function - /// updates `end` so that users of `Reader` can receive the data. - fn advanceBufferEnd(r: *Reader, n: usize) void { - assert(n <= r.buffer.len - r.end); - r.end += n; - } - - /// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful - /// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For - /// compatibility, we point it to this dummy variables, which we never otherwise access. - /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile - var win_dummy_bytes_read: u32 = undefined; - - /// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before - /// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data - /// is available. `handle` must have no pending asynchronous operation. - fn windowsAsyncReadToFifoAndQueueSmallRead( - gpa: Allocator, - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - r: *Reader, - small_buf: *[128]u8, - bump_amt: usize, - ) !enum { empty, populated, closed_populated, closed } { - var read_any_data = false; - while (true) { - const fifo_read_pending = while (true) { - const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt); - const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); - - if (0 == windows.kernel32.ReadFile( - handle, - buf.ptr, - buf_len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => break true, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - - read_any_data = true; - advanceBufferEnd(r, num_bytes_read); - - if (num_bytes_read == buf_len) { - // We filled the buffer, so there's probably more data available. - continue; - } else { - // We didn't fill the buffer, so assume we're out of data. - // There is no pending read. - break false; - } - }; - - if (fifo_read_pending) cancel_read: { - // Cancel the pending read into the FIFO. - _ = windows.kernel32.CancelIo(handle); - - // We have to wait for the handle to be signalled, i.e. for the cancelation to complete. - switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { - windows.WAIT_OBJECT_0 => {}, - windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), - else => unreachable, - } - - // If it completed before we canceled, make sure to tell the FIFO! - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => break :cancel_read, - }; - read_any_data = true; - advanceBufferEnd(r, num_bytes_read); - } - - // Try to queue the 1-byte read. - if (0 == windows.kernel32.ReadFile( - handle, - small_buf, - small_buf.len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => { - // 1-byte read pending as intended - return if (read_any_data) .populated else .empty; - }, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - // We got data back this time. Write it to the FIFO and run the main loop again. - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - const buf = small_buf[0..num_bytes_read]; - const dest = try writableSliceGreedyAlloc(r, gpa, buf.len); - @memcpy(dest[0..buf.len], buf); - advanceBufferEnd(r, buf.len); - read_any_data = true; - } - } - - /// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation. - /// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected). - /// - /// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the - /// operation immediately returns data: - /// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially - /// erroneous results." - /// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...] - /// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to - /// get the actual number of bytes read." - /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile - fn windowsGetReadResult( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - allow_aborted: bool, - ) !union(enum) { - success: u32, - closed, - aborted, - } { - var num_bytes_read: u32 = undefined; - if (0 == windows.kernel32.GetOverlappedResult( - handle, - overlapped, - &num_bytes_read, - 0, - )) switch (windows.GetLastError()) { - .BROKEN_PIPE => return .closed, - .OPERATION_ABORTED => |err| if (allow_aborted) { - return .aborted; - } else { - return windows.unexpectedError(err); - }, - else => |err| return windows.unexpectedError(err), - }; - return .{ .success = num_bytes_read }; - } - }; -} - -/// Given an enum, returns a struct with fields of that enum, each field -/// representing an I/O stream for polling. -pub fn PollFiles(comptime StreamEnum: type) type { - return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(Io.File), &@splat(.{})); -} - userdata: ?*anyopaque, vtable: *const VTable, @@ -599,6 +149,11 @@ pub const VTable = struct { futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, + operate: *const fn (?*anyopaque, Operation) Cancelable!Operation.Result, + batchAwaitAsync: *const fn (?*anyopaque, *Batch) Cancelable!void, + batchAwaitConcurrent: *const fn (?*anyopaque, *Batch, Timeout) Batch.AwaitConcurrentError!void, + batchCancel: *const fn (?*anyopaque, *Batch) void, + dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void, dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus, dirCreateDirPathOpen: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions, Dir.OpenOptions) Dir.CreateDirPathOpenError!Dir, @@ -633,9 +188,7 @@ pub const VTable = struct { fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize, fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize, fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize, - /// Returns 0 on end of stream. - fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.Reader.Error!usize, - /// Returns 0 on end of stream. + /// Returns 0 if reading at or past the end. fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize, fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void, fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void, @@ -702,20 +255,247 @@ pub const VTable = struct { netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void, }; +pub const Operation = union(enum) { + file_read_streaming: FileReadStreaming, + + pub const Tag = @typeInfo(Operation).@"union".tag_type.?; + + /// May return 0 reads which is different than `error.EndOfStream`. + pub const FileReadStreaming = struct { + file: File, + data: []const []u8, + + pub const Error = UnendingError || error{EndOfStream}; + pub const UnendingError = error{ + InputOutput, + SystemResources, + /// Trying to read a directory file descriptor as if it were a file. + IsDir, + ConnectionResetByPeer, + /// File was not opened with read capability. + NotOpenForReading, + SocketUnconnected, + /// Non-blocking has been enabled, and reading from the file descriptor + /// would block. + WouldBlock, + /// In WASI, this error occurs when the file descriptor does + /// not hold the required rights to read from it. + AccessDenied, + /// Unable to read file due to lock. Depending on the `Io` implementation, + /// reading from a locked file may return this error, or may ignore the + /// lock. + LockViolation, + } || Io.UnexpectedError; + + pub const Result = usize; + }; + + pub const Result = Result: { + const operation_fields = @typeInfo(Operation).@"union".fields; + var field_names: [operation_fields.len][]const u8 = undefined; + var field_types: [operation_fields.len]type = undefined; + for (operation_fields, &field_names, &field_types) |field, *field_name, *field_type| { + field_name.* = field.name; + field_type.* = field.type.Error!field.type.Result; + } + break :Result @Union(.auto, Tag, &field_names, &field_types, &@splat(.{})); + }; + + pub const Storage = union { + unused: List.DoubleNode, + submission: Submission, + pending: Pending, + completion: Completion, + + pub const Submission = struct { + node: List.SingleNode, + operation: Operation, + }; + + pub const Pending = struct { + node: List.DoubleNode, + tag: Tag, + context: [3]usize, + }; + + pub const Completion = struct { + node: List.SingleNode, + result: Result, + }; + }; + + pub const OptionalIndex = enum(u32) { + none = std.math.maxInt(u32), + _, + + pub fn fromIndex(i: usize) OptionalIndex { + const oi: OptionalIndex = @enumFromInt(i); + assert(oi != .none); + return oi; + } + + pub fn toIndex(oi: OptionalIndex) u32 { + assert(oi != .none); + return @intFromEnum(oi); + } + }; + pub const List = struct { + head: OptionalIndex, + tail: OptionalIndex, + + pub const empty: List = .{ .head = .none, .tail = .none }; + + pub const SingleNode = struct { next: OptionalIndex }; + pub const DoubleNode = struct { prev: OptionalIndex, next: OptionalIndex }; + }; +}; + +/// Performs one `Operation`. +pub fn operate(io: Io, operation: Operation) Cancelable!Operation.Result { + return io.vtable.operate(io.userdata, operation); +} + +/// Submits many operations together without waiting for all of them to +/// complete. +/// +/// This is a low-level abstraction based on `Operation`. For a higher +/// level API that operates on `Future`, see `Select` and `Group`. +pub const Batch = struct { + storage: []Operation.Storage, + unused: Operation.List, + submissions: Operation.List, + pending: Operation.List, + completions: Operation.List, + context: ?*anyopaque, + + /// After calling this, it is safe to unconditionally defer a call to + /// `cancel`. + pub fn init(storage: []Operation.Storage) Batch { + var prev: Operation.OptionalIndex = .none; + for (storage, 0..) |*operation, index| { + operation.* = .{ .unused = .{ .prev = prev, .next = .fromIndex(index + 1) } }; + prev = .fromIndex(index); + } + storage[storage.len - 1].unused.next = .none; + return .{ + .storage = storage, + .unused = .{ + .head = .fromIndex(0), + .tail = .fromIndex(storage.len - 1), + }, + .submissions = .empty, + .pending = .empty, + .completions = .empty, + .context = null, + }; + } + + /// Adds an operation to be performed at the next await call. + /// Returns the index that will be returned by `next` after the operation completes. + /// Asserts that no more than `storage.len` operations are active at a time. + pub fn add(b: *Batch, operation: Operation) u32 { + const index = b.unused.next; + b.addAt(index.toIndex(), operation); + return index; + } + + /// Adds an operation to be performed at the next await call. + /// After the operation completes, `next` will return `index`. + /// Asserts that the operation at `index` is not active. + pub fn addAt(b: *Batch, index: u32, operation: Operation) void { + const storage = &b.storage[index]; + const unused = storage.unused; + switch (unused.prev) { + .none => b.unused.head = .none, + else => |prev_index| b.storage[prev_index.toIndex()].unused.next = unused.next, + } + switch (unused.next) { + .none => b.unused.tail = .none, + else => |next_index| b.storage[next_index.toIndex()].unused.prev = unused.prev, + } + + switch (b.submissions.tail) { + .none => b.submissions.head = .fromIndex(index), + else => |tail_index| b.storage[tail_index.toIndex()].submission.node.next = .fromIndex(index), + } + storage.* = .{ .submission = .{ .node = .{ .next = .none }, .operation = operation } }; + b.submissions.tail = .fromIndex(index); + } + + /// After calling `awaitAsync`, `awaitConcurrent`, or `cancel`, this + /// function iterates over the completed operations. + /// + /// Each completion returned from this function dequeues from the `Batch`. + /// It is not required to dequeue all completions before awaiting again. + pub fn next(b: *Batch) ?struct { index: u32, result: Operation.Result } { + const index = b.completions.head; + if (index == .none) return null; + const storage = &b.storage[index.toIndex()]; + const completion = storage.completion; + const next_index = completion.node.next; + b.completions.head = next_index; + if (next_index == .none) b.completions.tail = .none; + + const tail_index = b.unused.tail; + switch (tail_index) { + .none => b.unused.head = index, + else => b.storage[tail_index.toIndex()].unused.next = index, + } + storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } }; + b.unused.tail = index; + return .{ .index = index.toIndex(), .result = completion.result }; + } + + /// Waits for at least one of the submitted operations to complete. After + /// this function returns the completed operations can be iterated with + /// `next`. + /// + /// This function provides opportunity for the implementation to introduce + /// concurrency into the batched operations, but unlike `awaitConcurrent`, + /// does not require it, and therefore cannot fail with + /// `error.ConcurrencyUnavailable`. + pub fn awaitAsync(b: *Batch, io: Io) Cancelable!void { + return io.vtable.batchAwaitAsync(io.userdata, b); + } + + pub const AwaitConcurrentError = ConcurrentError || Cancelable || Timeout.Error; + + /// Waits for at least one of the submitted operations to complete. After + /// this function returns the completed operations can be iterated with + /// `next`. + /// + /// Unlike `awaitAsync`, this function requires the implementation to + /// perform the operations concurrently and therefore can fail with + /// `error.ConcurrencyUnavailable`. + pub fn awaitConcurrent(b: *Batch, io: Io, timeout: Timeout) AwaitConcurrentError!void { + return io.vtable.batchAwaitConcurrent(io.userdata, b, timeout); + } + + /// Requests all pending operations to be interrupted, then waits for all + /// pending operations to complete. After this returns, the `Batch` is in a + /// well-defined state, ready to be iterated with `next`. Successfully + /// canceled operations will be absent from the iteration. Some operations + /// may have successfully completed regardless of the cancel request and + /// will appear in the iteration. + pub fn cancel(b: *Batch, io: Io) void { + return io.vtable.batchCancel(io.userdata, b); + } +}; + pub const Limit = enum(usize) { nothing = 0, - unlimited = std.math.maxInt(usize), + unlimited = math.maxInt(usize), _, - /// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`. + /// `math.maxInt(usize)` is interpreted to mean `.unlimited`. pub fn limited(n: usize) Limit { return @enumFromInt(n); } - /// Any value grater than `std.math.maxInt(usize)` is interpreted to mean + /// Any value grater than `math.maxInt(usize)` is interpreted to mean /// `.unlimited`. pub fn limited64(n: u64) Limit { - return @enumFromInt(@min(n, std.math.maxInt(usize))); + return @enumFromInt(@min(n, math.maxInt(usize))); } pub fn countVec(data: []const []const u8) Limit { @@ -929,9 +709,9 @@ pub const Clock = enum { }; } - pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool { + pub fn compare(lhs: Clock.Timestamp, op: math.CompareOperator, rhs: Clock.Timestamp) bool { assert(lhs.clock == rhs.clock); - return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds); + return math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds); } }; @@ -996,7 +776,7 @@ pub const Duration = struct { nanoseconds: i96, pub const zero: Duration = .{ .nanoseconds = 0 }; - pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) }; + pub const max: Duration = .{ .nanoseconds = math.maxInt(i96) }; pub fn fromNanoseconds(x: i96) Duration { return .{ .nanoseconds = x }; @@ -1652,7 +1432,7 @@ pub const Event = enum(u32) { pub fn set(e: *Event, io: Io) void { switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) { .unset, .is_set => {}, - .waiting => io.futexWake(Event, e, std.math.maxInt(u32)), + .waiting => io.futexWake(Event, e, math.maxInt(u32)), } } diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index e537755a33..e0297e0573 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -10,6 +10,18 @@ const assert = std.debug.assert; const Dir = std.Io.Dir; handle: Handle, +flags: Flags, + +pub const Flags = struct { + /// * true: + /// - windows: opened with MODE.IO.ASYNCHRONOUS + /// - POSIX: O_NONBLOCK is set + /// * false: + /// - windows: opened with SYNCHRONOUS_ALERT or SYNCHRONOUS_NONALERT, or + /// not a file. + /// - POSIX: O_NONBLOCK is unset + nonblocking: bool, +}; pub const Handle = std.posix.fd_t; @@ -18,6 +30,9 @@ pub const Writer = @import("File/Writer.zig"); pub const Atomic = @import("File/Atomic.zig"); /// Memory intended to remain consistent with file contents. pub const MemoryMap = @import("File/MemoryMap.zig"); +/// Concurrently read from multiple file streams, eliminating risk of +/// deadlocking. +pub const MultiReader = @import("File/MultiReader.zig"); pub const INode = std.posix.ino_t; pub const NLink = std.posix.nlink_t; @@ -77,9 +92,11 @@ pub fn stdout() File { return switch (native_os) { .windows => .{ .handle = std.os.windows.peb().ProcessParameters.hStdOutput, + .flags = .{ .nonblocking = false }, }, else => .{ .handle = std.posix.STDOUT_FILENO, + .flags = .{ .nonblocking = false }, }, }; } @@ -88,9 +105,11 @@ pub fn stderr() File { return switch (native_os) { .windows => .{ .handle = std.os.windows.peb().ProcessParameters.hStdError, + .flags = .{ .nonblocking = false }, }, else => .{ .handle = std.posix.STDERR_FILENO, + .flags = .{ .nonblocking = false }, }, }; } @@ -99,9 +118,11 @@ pub fn stdin() File { return switch (native_os) { .windows => .{ .handle = std.os.windows.peb().ProcessParameters.hStdInput, + .flags = .{ .nonblocking = false }, }, else => .{ .handle = std.posix.STDIN_FILENO, + .flags = .{ .nonblocking = false }, }, }; } @@ -549,12 +570,18 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void { }); } +pub const ReadStreamingError = error{EndOfStream} || Reader.Error; + /// Returns 0 on stream end or if `buffer` has no space available for data. /// /// See also: /// * `reader` -pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usize { - return io.vtable.fileReadStreaming(io.userdata, file, buffer); +pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize { + const result = try io.operate(.{ .file_read_streaming = .{ + .file = file, + .data = buffer, + } }); + return result.file_read_streaming; } pub const ReadPositionalError = error{ @@ -562,7 +589,6 @@ pub const ReadPositionalError = error{ SystemResources, /// Trying to read a directory file descriptor as if it were a file. IsDir, - BrokenPipe, /// Non-blocking has been enabled, and reading from the file descriptor /// would block. WouldBlock, diff --git a/lib/std/Io/File/MultiReader.zig b/lib/std/Io/File/MultiReader.zig new file mode 100644 index 0000000000..217215a363 --- /dev/null +++ b/lib/std/Io/File/MultiReader.zig @@ -0,0 +1,269 @@ +const MultiReader = @This(); + +const std = @import("../../std.zig"); +const Io = std.Io; +const File = Io.File; +const Allocator = std.mem.Allocator; +const assert = std.debug.assert; + +gpa: Allocator, +streams: *Streams, +batch: Io.Batch, + +pub const Context = struct { + mr: *MultiReader, + fr: File.Reader, + vec: [1][]u8, + err: ?Error, +}; + +pub const Error = UnendingError || error{EndOfStream}; +pub const UnendingError = Allocator.Error || File.Reader.Error || Io.ConcurrentError; + +/// Trailing: +/// * `contexts: [len]Context` +/// * `storage: [len]Io.Operation.Storage` +pub const Streams = extern struct { + len: u32, + + pub fn contexts(s: *Streams) []Context { + const base: usize = @intFromPtr(s); + const ptr: [*]Context = @ptrFromInt(std.mem.alignForward(usize, base + @sizeOf(Streams), @alignOf(Context))); + return ptr[0..s.len]; + } + + pub fn storage(s: *Streams) []Io.Operation.Storage { + const prev = contexts(s); + const end = prev.ptr + prev.len; + const ptr: [*]Io.Operation.Storage = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation.Storage))); + return ptr[0..s.len]; + } +}; + +pub fn Buffer(comptime n: usize) type { + return extern struct { + len: u32, + contexts: [n][@sizeOf(Context)]u8 align(@alignOf(Context)), + storage: [n][@sizeOf(Io.Operation.Storage)]u8 align(@alignOf(Io.Operation.Storage)), + + pub fn toStreams(b: *@This()) *Streams { + b.len = n; + return @ptrCast(b); + } + }; +} + +/// See `Streams.Buffer` for convenience API to obtain the `streams` parameter. +pub fn init(mr: *MultiReader, gpa: Allocator, io: Io, streams: *Streams, files: []const File) void { + const contexts = streams.contexts(); + for (contexts, files) |*context, file| context.* = .{ + .mr = mr, + .fr = .{ + .io = io, + .file = file, + .mode = .streaming, + .interface = .{ + .vtable = &.{ + .stream = stream, + .discard = discard, + .readVec = readVec, + .rebase = rebase, + }, + .buffer = &.{}, + .seek = 0, + .end = 0, + }, + }, + .vec = .{&.{}}, + .err = null, + }; + mr.* = .{ + .gpa = gpa, + .streams = streams, + .batch = .init(streams.storage()), + }; + for (contexts, 0..) |*context, i| { + const r = &context.fr.interface; + rebaseGrowing(mr, context, 1) catch |err| { + context.err = err; + continue; + }; + context.vec[0] = r.buffer; + mr.batch.addAt(@intCast(i), .{ .file_read_streaming = .{ + .file = context.fr.file, + .data = &context.vec, + } }); + } +} + +pub fn deinit(mr: *MultiReader) void { + const gpa = mr.gpa; + const contexts = mr.streams.contexts(); + const io = contexts[0].fr.io; + mr.batch.cancel(io); + for (contexts) |*context| { + gpa.free(context.fr.interface.buffer); + } +} + +pub fn fileReader(mr: *MultiReader, index: usize) *File.Reader { + return &mr.streams.contexts()[index].fr; +} + +pub fn reader(mr: *MultiReader, index: usize) *Io.Reader { + return &mr.streams.contexts()[index].fr.interface; +} + +/// Checks for errors in all streams, prioritizing `error.Canceled` if it +/// occurred anywhere, and ignoring `error.EndOfStream`. +pub fn checkAnyError(mr: *const MultiReader) UnendingError!void { + const contexts = mr.streams.contexts(); + var other: UnendingError!void = {}; + for (contexts) |*context| { + if (context.err) |err| switch (err) { + error.Canceled => |e| return e, + error.EndOfStream => continue, + else => |e| other = e, + }; + } + return other; +} + +pub fn toOwnedSlice(mr: *MultiReader, index: usize) Allocator.Error![]u8 { + const gpa = mr.gpa; + const r: *Io.Reader = reader(mr, index); + if (r.seek == 0) { + const new = try gpa.realloc(r.buffer, r.end); + r.buffer = &.{}; + r.end = 0; + return new; + } + const new = try gpa.dupe(u8, r.buffered()); + gpa.free(r.buffer); + r.buffer = &.{}; + r.seek = 0; + r.end = 0; + return new; +} + +fn stream(r: *Io.Reader, w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize { + _ = limit; + _ = w; + const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); + const context: *Context = @fieldParentPtr("fr", fr); + try fillUntimed(context, 1); + return 0; +} + +fn discard(r: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize { + _ = limit; + const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); + const context: *Context = @fieldParentPtr("fr", fr); + try fillUntimed(context, 1); + return 0; +} + +fn readVec(r: *Io.Reader, data: [][]u8) Io.Reader.Error!usize { + _ = data; + const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); + const context: *Context = @fieldParentPtr("fr", fr); + try fillUntimed(context, 1); + return 0; +} + +fn rebase(r: *Io.Reader, capacity: usize) Io.Reader.RebaseError!void { + const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); + const context: *Context = @fieldParentPtr("fr", fr); + try fillUntimed(context, capacity); +} + +fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void { + fill(context.mr, capacity, .none) catch |err| switch (err) { + error.Timeout, error.UnsupportedClock => unreachable, + error.Canceled, error.ConcurrencyUnavailable => |e| { + context.err = e; + return error.ReadFailed; + }, + error.EndOfStream => |e| return e, + }; + if (context.err) |err| switch (err) { + error.EndOfStream => |e| return e, + else => return error.ReadFailed, + }; +} + +pub const FillError = Io.Batch.AwaitConcurrentError || error{ + /// `fill` was called when all streams already have failed or reached the + /// end. + EndOfStream, +}; + +/// Wait until at least one stream receives more data. +pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillError!void { + const contexts = mr.streams.contexts(); + const io = contexts[0].fr.io; + var any_completed = false; + + try mr.batch.awaitConcurrent(io, timeout); + + while (mr.batch.next()) |operation| { + any_completed = true; + const context = &contexts[operation.index]; + const n = operation.result.file_read_streaming catch |err| { + context.err = err; + continue; + }; + const r = &context.fr.interface; + r.end += n; + if (r.buffer.len - r.end < unused_capacity) { + rebaseGrowing(mr, context, r.bufferedLen() + unused_capacity) catch |err| { + context.err = err; + continue; + }; + assert(r.seek == 0); + } + context.vec[0] = r.buffer[r.end..]; + mr.batch.addAt(operation.index, .{ .file_read_streaming = .{ + .file = context.fr.file, + .data = &context.vec, + } }); + } + + if (!any_completed) return error.EndOfStream; +} + +/// Wait until all streams fail or reach the end. +pub fn fillRemaining(mr: *MultiReader, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void { + while (fill(mr, 1, timeout)) |_| {} else |err| switch (err) { + error.EndOfStream => return, + else => |e| return e, + } +} + +fn rebaseGrowing(mr: *MultiReader, context: *Context, capacity: usize) Allocator.Error!void { + const gpa = mr.gpa; + const r = &context.fr.interface; + if (r.buffer.len >= capacity) { + const data = r.buffer[r.seek..r.end]; + @memmove(r.buffer[0..data.len], data); + r.seek = 0; + r.end = data.len; + } else { + const adjusted_capacity = std.ArrayList(u8).growCapacity(capacity); + + if (r.seek == 0) { + if (gpa.remap(r.buffer, adjusted_capacity)) |new_memory| { + r.buffer = new_memory; + return; + } + } + + const data = r.buffer[r.seek..r.end]; + const new = try gpa.alloc(u8, adjusted_capacity); + @memcpy(new[0..data.len], data); + gpa.free(r.buffer); + r.buffer = new; + r.seek = 0; + r.end = data.len; + } +} diff --git a/lib/std/Io/File/Reader.zig b/lib/std/Io/File/Reader.zig index 2e0e192cb2..effd000df8 100644 --- a/lib/std/Io/File/Reader.zig +++ b/lib/std/Io/File/Reader.zig @@ -26,27 +26,7 @@ size_err: ?SizeError = null, seek_err: ?SeekError = null, interface: Io.Reader, -pub const Error = error{ - InputOutput, - SystemResources, - /// Trying to read a directory file descriptor as if it were a file. - IsDir, - BrokenPipe, - ConnectionResetByPeer, - /// File was not opened with read capability. - NotOpenForReading, - SocketUnconnected, - /// Non-blocking has been enabled, and reading from the file descriptor - /// would block. - WouldBlock, - /// In WASI, this error occurs when the file descriptor does - /// not hold the required rights to read from it. - AccessDenied, - /// Unable to read file due to lock. Depending on the `Io` implementation, - /// reading from a locked file may return this error, or may ignore the - /// lock. - LockViolation, -} || Io.Cancelable || Io.UnexpectedError; +pub const Error = Io.Operation.FileReadStreaming.UnendingError || Io.Cancelable; pub const SizeError = File.StatError || error{ /// Occurs if, for example, the file handle is a network socket and therefore does not have a size. @@ -300,14 +280,16 @@ fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize { const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data); const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); - const n = io.vtable.fileReadStreaming(io.userdata, r.file, dest) catch |err| { - r.err = err; - return error.ReadFailed; + const n = r.file.readStreaming(io, dest) catch |err| switch (err) { + error.EndOfStream => { + r.size = r.pos; + return error.EndOfStream; + }, + else => |e| { + r.err = e; + return error.ReadFailed; + }, }; - if (n == 0) { - r.size = r.pos; - return error.EndOfStream; - } r.pos += n; if (n > data_size) { r.interface.end += n - data_size; @@ -355,14 +337,16 @@ fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize { const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, &data); const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); - const n = io.vtable.fileReadStreaming(io.userdata, file, dest) catch |err| { - r.err = err; - return error.ReadFailed; + const n = file.readStreaming(io, dest) catch |err| switch (err) { + error.EndOfStream => { + r.size = r.pos; + return error.EndOfStream; + }, + else => |e| { + r.err = e; + return error.ReadFailed; + }, }; - if (n == 0) { - r.size = r.pos; - return error.EndOfStream; - } r.pos += n; if (n > data_size) { r.interface.end += n - data_size; diff --git a/lib/std/Io/Reader.zig b/lib/std/Io/Reader.zig index a2b70afc67..9ff025a637 100644 --- a/lib/std/Io/Reader.zig +++ b/lib/std/Io/Reader.zig @@ -127,9 +127,7 @@ pub const ShortError = error{ ReadFailed, }; -pub const RebaseError = error{ - EndOfStream, -}; +pub const RebaseError = Error; pub const failing: Reader = .{ .vtable = &.{ @@ -315,6 +313,27 @@ pub fn allocRemainingAlignedSentinel( } } +pub const AppendExactError = Allocator.Error || Error; + +/// Transfers exactly `n` bytes from the reader to the `ArrayList`. +/// +/// See also: +/// * `appendRemaining` +pub fn appendExact( + r: *Reader, + gpa: Allocator, + list: *ArrayList(u8), + n: usize, +) AppendExactError!void { + try list.ensureUnusedCapacity(gpa, n); + var a = std.Io.Writer.Allocating.fromArrayList(gpa, list); + defer list.* = a.toArrayList(); + streamExact(r, &a.writer, n) catch |err| switch (err) { + error.ReadFailed, error.EndOfStream => |e| return e, + error.WriteFailed => unreachable, + }; +} + /// Transfers all bytes from the current position to the end of the stream, up /// to `limit`, appending them to `list`. /// @@ -1381,7 +1400,7 @@ pub fn takeLeb128(r: *Reader, comptime T: type) TakeLeb128Error!T { } /// Ensures `capacity` data can be buffered without rebasing. -pub fn rebase(r: *Reader, capacity: usize) RebaseError!void { +pub fn rebase(r: *Reader, capacity: usize) Error!void { if (r.buffer.len - r.seek >= capacity) { @branchHint(.likely); return; @@ -1389,7 +1408,7 @@ pub fn rebase(r: *Reader, capacity: usize) RebaseError!void { return r.vtable.rebase(r, capacity); } -pub fn defaultRebase(r: *Reader, capacity: usize) RebaseError!void { +pub fn defaultRebase(r: *Reader, capacity: usize) Error!void { assert(r.buffer.len - r.seek < capacity); const data = r.buffer[r.seek..r.end]; @memmove(r.buffer[0..data.len], data); diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 918b0f43f9..e9b62ca5f5 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1255,6 +1255,32 @@ const AlertableSyscall = struct { assert(is_windows); } + fn start() Io.Cancelable!AlertableSyscall { + const thread = Thread.current orelse return .{ .thread = null }; + switch (thread.cancel_protection) { + .blocked => return .{ .thread = null }, + .unblocked => {}, + } + const old_status = thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b010), + .awaitable = .null, + }, .monotonic); + switch (old_status.cancelation) { + .parked => unreachable, + .blocked => unreachable, + .blocked_alertable => unreachable, + .blocked_canceling => unreachable, + .blocked_alertable_canceling => unreachable, + .none => return .{ .thread = thread }, // new status is `.blocked_alertable` + .canceling => { + // Status is unchanged (still `.canceling`)---change to `.canceled` before return. + thread.status.store(.{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic); + return error.Canceled; + }, + .canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged) + } + } + fn checkCancel(s: AlertableSyscall) Io.Cancelable!void { comptime assert(is_windows); const thread = s.thread orelse return; @@ -1314,8 +1340,17 @@ const AlertableSyscall = struct { } }; +fn waitForApcOrAlert() void { + const infinite_timeout: windows.LARGE_INTEGER = std.math.minInt(windows.LARGE_INTEGER); + _ = windows.ntdll.NtDelayExecution(windows.TRUE, &infinite_timeout); +} + const max_iovecs_len = 8; const splat_buffer_size = 64; +/// Happens to be the same number that matches maximum number of handles that +/// NtWaitForMultipleObjects accepts. We use this value also for poll() on +/// posix systems. +const poll_buffer_len = 64; const default_PATH = "/usr/local/bin:/bin/:/usr/bin"; comptime { @@ -1579,6 +1614,11 @@ pub fn io(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, + .operate = operate, + .batchAwaitAsync = batchAwaitAsync, + .batchAwaitConcurrent = batchAwaitConcurrent, + .batchCancel = batchCancel, + .dirCreateDir = dirCreateDir, .dirCreateDirPath = dirCreateDirPath, .dirCreateDirPathOpen = dirCreateDirPathOpen, @@ -1613,7 +1653,6 @@ pub fn io(t: *Threaded) Io { .fileWritePositional = fileWritePositional, .fileWriteFileStreaming = fileWriteFileStreaming, .fileWriteFilePositional = fileWriteFilePositional, - .fileReadStreaming = fileReadStreaming, .fileReadPositional = fileReadPositional, .fileSeekBy = fileSeekBy, .fileSeekTo = fileSeekTo, @@ -1739,6 +1778,11 @@ pub fn ioBasic(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, + .operate = operate, + .batchAwaitAsync = batchAwaitAsync, + .batchAwaitConcurrent = batchAwaitConcurrent, + .batchCancel = batchCancel, + .dirCreateDir = dirCreateDir, .dirCreateDirPath = dirCreateDirPath, .dirCreateDirPathOpen = dirCreateDirPathOpen, @@ -1773,7 +1817,6 @@ pub fn ioBasic(t: *Threaded) Io { .fileWritePositional = fileWritePositional, .fileWriteFileStreaming = fileWriteFileStreaming, .fileWriteFilePositional = fileWriteFilePositional, - .fileReadStreaming = fileReadStreaming, .fileReadPositional = fileReadPositional, .fileSeekBy = fileSeekBy, .fileSeekTo = fileSeekTo, @@ -2440,6 +2483,485 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { Thread.futexWake(ptr, max_waiters); } +fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Operation.Result { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + switch (operation) { + .file_read_streaming => |o| return .{ + .file_read_streaming = fileReadStreaming(t, o.file, o.data) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| e, + }, + }, + } +} + +fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (is_windows) { + batchAwaitWindows(b, false) catch |err| switch (err) { + error.ConcurrencyUnavailable => unreachable, // passed concurrency=false + else => |e| return e, + }; + const alertable_syscall = try AlertableSyscall.start(); + while (b.pending.head != .none and b.completions.head == .none) waitForApcOrAlert(); + alertable_syscall.finish(); + return; + } + if (native_os == .wasi and !builtin.link_libc) @panic("TODO"); + var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; + var poll_len: u32 = 0; + { + var index = b.submissions.head; + while (index != .none and poll_len < poll_buffer_len) { + const submission = &b.storage[index.toIndex()].submission; + switch (submission.operation) { + .file_read_streaming => |o| { + poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 }; + poll_len += 1; + }, + } + index = submission.node.next; + } + } + switch (poll_len) { + 0 => return, + 1 => {}, + else => while (true) { + const timeout_ms: i32 = t: { + if (b.completions.head != .none) { + // It is legal to call batchWait with already completed + // operations in the ring. In such case, we need to avoid + // blocking in the poll syscall, but we can still take this + // opportunity to find additional ready operations. + break :t 0; + } + const max_poll_ms = std.math.maxInt(i32); + break :t max_poll_ms; + }; + const syscall = try Syscall.start(); + const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms); + syscall.finish(); + switch (posix.errno(rc)) { + .SUCCESS => { + if (rc == 0) { + if (b.completions.head != .none) { + // Since there are already completions available in the + // queue, this is neither a timeout nor a case for + // retrying. + return; + } + continue; + } + var prev_index: Io.Operation.OptionalIndex = .none; + var index = b.submissions.head; + for (poll_buffer[0..poll_len]) |poll_entry| { + const storage = &b.storage[index.toIndex()]; + const submission = &storage.submission; + const next_index = submission.node.next; + if (poll_entry.revents != 0) { + const result = try operate(t, submission.operation); + + switch (prev_index) { + .none => b.submissions.head = next_index, + else => b.storage[prev_index.toIndex()].submission.node.next = next_index, + } + if (next_index == .none) b.submissions.tail = prev_index; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + b.completions.tail = index; + } else prev_index = index; + index = next_index; + } + assert(index == .none); + return; + }, + .INTR => continue, + else => break, + } + }, + } + { + var tail_index = b.completions.tail; + defer b.completions.tail = tail_index; + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const storage = &b.storage[index.toIndex()]; + const submission = &storage.submission; + const next_index = submission.node.next; + const result = try operate(t, submission.operation); + + switch (tail_index) { + .none => b.completions.head = index, + else => b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + tail_index = index; + index = next_index; + } + b.submissions = .{ .head = .none, .tail = .none }; + } +} + +fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (is_windows) { + const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) { + error.Unexpected => deadline: { + recoverableOsBugDetected(); + break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake }; + }, + error.UnsupportedClock => |e| return e, + }; + try batchAwaitWindows(b, true); + while (b.pending.head != .none and b.completions.head == .none) { + var delay_interval: windows.LARGE_INTEGER = interval: { + const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER); + break :interval t.deadlineToWindowsInterval(d) catch |err| switch (err) { + error.UnsupportedClock => |e| return e, + error.Unexpected => { + recoverableOsBugDetected(); + break :interval -1; + }, + }; + }; + const alertable_syscall = try AlertableSyscall.start(); + const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval); + alertable_syscall.finish(); + switch (delay_rc) { + .SUCCESS, .TIMEOUT => { + // The thread woke due to the timeout. Although spurious + // timeouts are OK, when no deadline is passed we must not + // return `error.Timeout`. + if (timeout != .none and b.completions.head == .none) return error.Timeout; + }, + else => {}, + } + } + return; + } + if (native_os == .wasi and !builtin.link_libc) @panic("TODO"); + var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; + var poll_storage: struct { + gpa: std.mem.Allocator, + b: *Io.Batch, + slice: []posix.pollfd, + len: u32, + + fn add(storage: *@This(), file: Io.File, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void { + const len = storage.len; + if (len == poll_buffer_len) { + const slice: []posix.pollfd = if (storage.b.context) |context| + @as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..storage.b.storage.len] + else allocation: { + const allocation = storage.gpa.alloc(posix.pollfd, storage.b.storage.len) catch + return error.ConcurrencyUnavailable; + storage.b.context = allocation.ptr; + break :allocation allocation; + }; + @memcpy(slice[0..poll_buffer_len], storage.slice); + } + storage.slice[len] = .{ + .fd = file.handle, + .events = events, + .revents = 0, + }; + storage.len = len + 1; + } + } = .{ .gpa = t.allocator, .b = b, .slice = &poll_buffer, .len = 0 }; + { + var index = b.submissions.head; + while (index != .none) { + const submission = &b.storage[index.toIndex()].submission; + switch (submission.operation) { + .file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN), + } + index = submission.node.next; + } + } + switch (poll_storage.len) { + 0 => return, + 1 => if (timeout == .none) { + const index = b.submissions.head; + const storage = &b.storage[index.toIndex()]; + const result = try operate(t, storage.submission.operation); + + b.submissions = .{ .head = .none, .tail = .none }; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + b.completions.tail = index; + return; + }, + else => {}, + } + const t_io = ioBasic(t); + const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock; + while (true) { + const timeout_ms: i32 = t: { + if (b.completions.head != .none) { + // It is legal to call batchWait with already completed + // operations in the ring. In such case, we need to avoid + // blocking in the poll syscall, but we can still take this + // opportunity to find additional ready operations. + break :t 0; + } + const d = deadline orelse break :t -1; + const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock; + if (duration.raw.nanoseconds <= 0) return error.Timeout; + const max_poll_ms = std.math.maxInt(i32); + break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); + }; + const syscall = try Syscall.start(); + const rc = posix.system.poll(&poll_buffer, poll_storage.len, timeout_ms); + syscall.finish(); + switch (posix.errno(rc)) { + .SUCCESS => { + if (rc == 0) { + if (b.completions.head != .none) { + // Since there are already completions available in the + // queue, this is neither a timeout nor a case for + // retrying. + return; + } + // Although spurious timeouts are OK, when no deadline is + // passed we must not return `error.Timeout`. + if (deadline == null) continue; + return error.Timeout; + } + var prev_index: Io.Operation.OptionalIndex = .none; + var index = b.submissions.head; + for (poll_storage.slice[0..poll_storage.len]) |poll_entry| { + const submission = &b.storage[index.toIndex()].submission; + const next_index = submission.node.next; + if (poll_entry.revents != 0) { + const result = try operate(t, submission.operation); + + switch (prev_index) { + .none => b.submissions.head = next_index, + else => b.storage[prev_index.toIndex()].submission.node.next = next_index, + } + if (next_index == .none) b.submissions.tail = prev_index; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + b.completions.tail = index; + b.storage[index.toIndex()] = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + } else prev_index = index; + index = next_index; + } + assert(index == .none); + return; + }, + .INTR => continue, + else => return error.ConcurrencyUnavailable, + } + } +} + +const WindowsBatchPendingOperationContext = extern struct { + file: windows.HANDLE, + iosb: windows.IO_STATUS_BLOCK, + + const Erased = [3]usize; + + comptime { + assert(@sizeOf(Erased) <= @sizeOf(WindowsBatchPendingOperationContext)); + } + + fn toErased(context: *WindowsBatchPendingOperationContext) *Erased { + return @ptrCast(context); + } + + fn fromErased(erased: *Erased) *WindowsBatchPendingOperationContext { + return @ptrCast(erased); + } +}; + +fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + { + var tail_index = b.unused.tail; + defer b.unused.tail = tail_index; + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const next_index = b.storage[index.toIndex()].submission.node.next; + switch (tail_index) { + .none => b.unused.head = index, + else => b.storage[tail_index.toIndex()].unused.next = index, + } + b.storage[index.toIndex()] = .{ .unused = .{ .prev = tail_index, .next = .none } }; + tail_index = index; + index = next_index; + } + b.submissions = .{ .head = .none, .tail = .none }; + } + if (is_windows) { + var index = b.pending.head; + while (index != .none) { + const pending = &b.storage[index.toIndex()].pending; + const context: *WindowsBatchPendingOperationContext = .fromErased(&pending.context); + var cancel_iosb: windows.IO_STATUS_BLOCK = undefined; + _ = windows.ntdll.NtCancelIoFileEx(context.file, &context.iosb, &cancel_iosb); + index = pending.node.next; + } + while (b.pending.head != .none) waitForApcOrAlert(); + } else if (b.context) |context| { + t.allocator.free(@as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..b.storage.len]); + b.context = null; + } + assert(b.pending.head == .none); +} + +fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void { + const b: *Io.Batch = @ptrCast(@alignCast(apc_context)); + const context: *WindowsBatchPendingOperationContext = @fieldParentPtr("iosb", iosb); + const erased_context = context.toErased(); + const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("context", erased_context); + switch (pending.node.prev) { + .none => b.pending.head = pending.node.next, + else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next, + } + switch (pending.node.next) { + .none => b.pending.tail = pending.node.prev, + else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev, + } + const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending); + const index = storage - b.storage.ptr; + switch (iosb.u.Status) { + .CANCELLED => { + const tail_index = b.unused.tail; + switch (tail_index) { + .none => b.unused.head = .fromIndex(index), + else => b.storage[tail_index.toIndex()].unused.next = .fromIndex(index), + } + storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } }; + b.unused.tail = .fromIndex(index); + }, + else => { + switch (b.completions.tail) { + .none => b.completions.head = .fromIndex(index), + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = .fromIndex(index), + } + b.completions.tail = .fromIndex(index); + const result: Io.Operation.Result = switch (pending.tag) { + .file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) }, + }; + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + }, + } +} + +/// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable. +fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, ConcurrencyUnavailable }!void { + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const storage = &b.storage[index.toIndex()]; + const submission = storage.submission; + storage.* = .{ .pending = .{ + .node = .{ .prev = b.pending.tail, .next = .none }, + .tag = submission.operation, + .context = undefined, + } }; + switch (b.pending.tail) { + .none => b.pending.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].pending.node.next = index, + } + b.pending.tail = index; + const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context); + errdefer { + context.iosb.u.Status = .CANCELLED; + batchApc(b, &context.iosb, 0); + } + switch (submission.operation) { + .file_read_streaming => |o| o: { + var data_index: usize = 0; + while (o.data.len - data_index != 0 and o.data[data_index].len == 0) data_index += 1; + if (o.data.len - data_index == 0) { + context.iosb = .{ + .u = .{ .Status = .SUCCESS }, + .Information = 0, + }; + batchApc(b, &context.iosb, 0); + break :o; + } + const buffer = o.data[data_index]; + const short_buffer_len = @min(std.math.maxInt(u32), buffer.len); + + if (o.file.flags.nonblocking) { + context.file = o.file.handle; + switch (windows.ntdll.NtReadFile( + o.file.handle, + null, // event + &batchApc, + b, + &context.iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + .PENDING, .SUCCESS => {}, + .CANCELLED => unreachable, + else => |status| { + context.iosb.u.Status = status; + batchApc(b, &context.iosb, 0); + }, + } + } else { + if (concurrency) return error.ConcurrencyUnavailable; + + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtReadFile( + o.file.handle, + null, // event + null, // APC routine + null, // APC context + &context.iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |status| { + syscall.finish(); + + context.iosb.u.Status = status; + batchApc(b, &context.iosb, 0); + break; + }, + }; + } + }, + } + index = submission.node.next; + } + b.submissions = .{ .head = .none, .tail = .none }; +} + +fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void { + const ct = complete_tail.*; + const len: u31 = @intCast(ring.len); + ring[ct.index(len)] = op; + complete_tail.* = ct.next(len); +} + const dirCreateDir = switch (native_os) { .windows => dirCreateDirWindows, .wasi => dirCreateDirWasi, @@ -2759,8 +3281,10 @@ fn dirCreateDirPathOpenWasi( fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); - const file: File = .{ .handle = dir.handle }; - return fileStat(t, file); + return fileStat(t, .{ + .handle = dir.handle, + .flags = .{ .nonblocking = false }, + }); } const dirStatFile = switch (native_os) { @@ -3552,7 +4076,10 @@ fn dirCreateFilePosix( } } - return .{ .handle = fd }; + return .{ + .handle = fd, + .flags = .{ .nonblocking = false }, + }; } fn dirCreateFileWindows( @@ -3682,7 +4209,10 @@ fn dirCreateFileWindows( errdefer windows.CloseHandle(handle); const exclusive = switch (flags.lock) { - .none => return .{ .handle = handle }, + .none => return .{ + .handle = handle, + .flags = .{ .nonblocking = false }, + }, .shared => false, .exclusive => true, }; @@ -3702,7 +4232,10 @@ fn dirCreateFileWindows( )) { .SUCCESS => { syscall.finish(); - return .{ .handle = handle }; + return .{ + .handle = handle, + .flags = .{ .nonblocking = false }, + }; }, .INSUFFICIENT_RESOURCES => return syscall.fail(error.SystemResources), .LOCK_NOT_GRANTED => return syscall.fail(error.WouldBlock), @@ -3751,7 +4284,10 @@ fn dirCreateFileWasi( switch (wasi.path_open(dir.handle, lookup_flags, sub_path.ptr, sub_path.len, oflags, base, inheriting, fdflags, &fd)) { .SUCCESS => { syscall.finish(); - return .{ .handle = fd }; + return .{ + .handle = fd, + .flags = .{ .nonblocking = false }, + }; }, .INTR => { try syscall.checkCancel(); @@ -3846,7 +4382,10 @@ fn dirCreateFileAtomic( .SUCCESS => { syscall.finish(); return .{ - .file = .{ .handle = @intCast(rc) }, + .file = .{ + .handle = @intCast(rc), + .flags = .{ .nonblocking = false }, + }, .file_basename_hex = 0, .dest_sub_path = dest_path, .file_open = true, @@ -4054,7 +4593,10 @@ fn dirOpenFilePosix( if (!flags.allow_directory) { const is_dir = is_dir: { - const stat = fileStat(t, .{ .handle = fd }) catch |err| switch (err) { + const stat = fileStat(t, .{ + .handle = fd, + .flags = .{ .nonblocking = false }, + }) catch |err| switch (err) { // The directory-ness is either unknown or unknowable error.Streaming => break :is_dir false, else => |e| return e, @@ -4140,7 +4682,10 @@ fn dirOpenFilePosix( } } - return .{ .handle = fd }; + return .{ + .handle = fd, + .flags = .{ .nonblocking = false }, + }; } fn dirOpenFileWindows( @@ -4273,7 +4818,10 @@ pub fn dirOpenFileWtf16( errdefer w.CloseHandle(handle); const exclusive = switch (flags.lock) { - .none => return .{ .handle = handle }, + .none => return .{ + .handle = handle, + .flags = .{ .nonblocking = false }, + }, .shared => false, .exclusive => true, }; @@ -4296,7 +4844,10 @@ pub fn dirOpenFileWtf16( .ACCESS_VIOLATION => |err| return syscall.ntstatusBug(err), // bad io_status_block pointer else => |status| return syscall.unexpectedNtstatus(status), }; - return .{ .handle = handle }; + return .{ + .handle = handle, + .flags = .{ .nonblocking = false }, + }; } fn dirOpenFileWasi( @@ -4378,7 +4929,7 @@ fn dirOpenFileWasi( if (!flags.allow_directory) { const is_dir = is_dir: { - const stat = fileStat(t, .{ .handle = fd }) catch |err| switch (err) { + const stat = fileStat(t, .{ .handle = fd, .flags = .{ .nonblocking = false } }) catch |err| switch (err) { // The directory-ness is either unknown or unknowable error.Streaming => break :is_dir false, else => |e| return e, @@ -4388,7 +4939,10 @@ fn dirOpenFileWasi( if (is_dir) return error.IsDir; } - return .{ .handle = fd }; + return .{ + .handle = fd, + .flags = .{ .nonblocking = false }, + }; } const dirOpenDir = switch (native_os) { @@ -5277,7 +5831,7 @@ fn dirRealPathFileWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, fn realPathWindows(h_file: windows.HANDLE, out_buffer: []u8) File.RealPathError!usize { var wide_buf: [windows.PATH_MAX_WIDE]u16 = undefined; - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks try Thread.checkCancel(); const wide_slice = try windows.GetFinalPathNameByHandle(h_file, .{}, &wide_buf); @@ -8275,14 +8829,14 @@ fn fileClose(userdata: ?*anyopaque, files: []const File) void { for (files) |file| posix.close(file.handle); } -fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize { +fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.ReadStreamingError!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; if (is_windows) return fileReadStreamingWindows(file, data); return fileReadStreamingPosix(file, data); } -fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usize { +fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingError!usize { var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; for (data) |buf| { @@ -8303,28 +8857,24 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) { .SUCCESS => { syscall.finish(); + if (nread == 0) return error.EndOfStream; return nread; }, .INTR, .TIMEDOUT => { try syscall.checkCancel(); continue; }, - else => |e| { - syscall.finish(); - switch (e) { - .INVAL => |err| return errnoBug(err), - .FAULT => |err| return errnoBug(err), - .BADF => return error.IsDir, // File operation on directory. - .IO => return error.InputOutput, - .ISDIR => return error.IsDir, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketUnconnected, - .CONNRESET => return error.ConnectionResetByPeer, - .NOTCAPABLE => return error.AccessDenied, - else => |err| return posix.unexpectedErrno(err), - } - }, + .BADF => return syscall.fail(error.IsDir), // File operation on directory. + .IO => return syscall.fail(error.InputOutput), + .ISDIR => return syscall.fail(error.IsDir), + .NOBUFS => return syscall.fail(error.SystemResources), + .NOMEM => return syscall.fail(error.SystemResources), + .NOTCONN => return syscall.fail(error.SocketUnconnected), + .CONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .NOTCAPABLE => return syscall.fail(error.AccessDenied), + .INVAL => |err| return syscall.errnoBug(err), + .FAULT => |err| return syscall.errnoBug(err), + else => |err| return syscall.unexpectedErrno(err), } } } @@ -8335,75 +8885,115 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz switch (posix.errno(rc)) { .SUCCESS => { syscall.finish(); + if (rc == 0) return error.EndOfStream; return @intCast(rc); }, .INTR, .TIMEDOUT => { try syscall.checkCancel(); continue; }, - else => |e| { + .BADF => { syscall.finish(); - switch (e) { - .INVAL => |err| return errnoBug(err), - .FAULT => |err| return errnoBug(err), - .AGAIN => return error.WouldBlock, - .BADF => { - if (native_os == .wasi) return error.IsDir; // File operation on directory. - return error.NotOpenForReading; - }, - .IO => return error.InputOutput, - .ISDIR => return error.IsDir, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketUnconnected, - .CONNRESET => return error.ConnectionResetByPeer, - else => |err| return posix.unexpectedErrno(err), - } + if (native_os == .wasi) return error.IsDir; // File operation on directory. + return error.NotOpenForReading; }, + .AGAIN => return syscall.fail(error.WouldBlock), + .IO => return syscall.fail(error.InputOutput), + .ISDIR => return syscall.fail(error.IsDir), + .NOBUFS => return syscall.fail(error.SystemResources), + .NOMEM => return syscall.fail(error.SystemResources), + .NOTCONN => return syscall.fail(error.SocketUnconnected), + .CONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .INVAL => |err| return syscall.errnoBug(err), + .FAULT => |err| return syscall.errnoBug(err), + else => |err| return syscall.unexpectedErrno(err), } } } -fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize { - const DWORD = windows.DWORD; +fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize { var index: usize = 0; - while (index < data.len and data[index].len == 0) index += 1; - if (index == data.len) return 0; + while (data.len - index != 0 and data[index].len == 0) index += 1; + if (data.len - index == 0) return 0; const buffer = data[index]; - const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len); + const short_buffer_len = @min(std.math.maxInt(u32), buffer.len); - const syscall: Syscall = try .start(); - while (true) { - var n: DWORD = undefined; - if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) != 0) { - syscall.finish(); - return n; - } - switch (windows.GetLastError()) { - .IO_PENDING => |err| { - syscall.finish(); - return windows.errorBug(err); - }, - .OPERATION_ABORTED => { + var iosb: windows.IO_STATUS_BLOCK = undefined; + + if (!file.flags.nonblocking) { + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtReadFile( + file.handle, + null, // event + null, // APC routine + null, // APC context + &iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag + .CANCELLED => { try syscall.checkCancel(); continue; }, - .BROKEN_PIPE, .HANDLE_EOF => { + else => |status| { syscall.finish(); - return 0; + iosb.u.Status = status; + return ntReadFileResult(&iosb); }, - .NETNAME_DELETED => if (is_debug) unreachable else return error.Unexpected, - .LOCK_VIOLATION => return syscall.fail(error.LockViolation), - .ACCESS_DENIED => return syscall.fail(error.AccessDenied), - .INVALID_HANDLE => if (is_debug) unreachable else return error.Unexpected, - // TODO: Determine if INVALID_FUNCTION is possible in more scenarios than just passing - // a handle to a directory. - .INVALID_FUNCTION => return syscall.fail(error.IsDir), - else => |err| { - syscall.finish(); - return windows.unexpectedError(err); - }, - } + }; + } + + var done: bool = false; + + switch (windows.ntdll.NtReadFile( + file.handle, + null, // event + flagApc, + &done, // APC context + &iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + // We must wait for the APC routine. + .PENDING, .SUCCESS => while (!done) { + // Once we get here we must not return from the function until the + // operation completes, thereby releasing reference to io_status_block. + const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) { + error.Canceled => |e| { + var cancel_iosb: windows.IO_STATUS_BLOCK = undefined; + _ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb); + while (!done) waitForApcOrAlert(); + return e; + }, + }; + waitForApcOrAlert(); + alertable_syscall.finish(); + }, + else => |status| iosb.u.Status = status, + } + return ntReadFileResult(&iosb); +} + +fn flagApc(userdata: ?*anyopaque, _: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void { + const flag: *bool = @ptrCast(userdata); + flag.* = true; +} + +fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize { + switch (io_status_block.u.Status) { + .PENDING => unreachable, + .CANCELLED => unreachable, + .SUCCESS => return io_status_block.Information, + .END_OF_FILE, .PIPE_BROKEN => return error.EndOfStream, + .INVALID_DEVICE_REQUEST => return error.IsDir, + .LOCK_NOT_GRANTED => return error.LockViolation, + .ACCESS_DENIED => return error.AccessDenied, + else => |status| return windows.unexpectedStatus(status), } } @@ -9037,7 +9627,7 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) process.Execut }; defer w.CloseHandle(h_file); - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks try Thread.checkCancel(); const wide_slice = try w.GetFinalPathNameByHandle(h_file, .{}, &path_name_w_buf.data); @@ -9359,6 +9949,7 @@ fn writeFileStreamingWindows( handle: windows.HANDLE, bytes: []const u8, ) File.Writer.Error!usize { + assert(bytes.len != 0); var bytes_written: windows.DWORD = undefined; const adjusted_len = std.math.lossyCast(u32, bytes.len); const syscall: Syscall = try .start(); @@ -10075,6 +10666,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (timeout == .none) return; if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t))); if (native_os == .wasi) return sleepWasi(t, timeout); if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout); @@ -12707,7 +13299,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr if (is_windows) { var dir_path_buffer: [windows.PATH_MAX_WIDE]u16 = undefined; - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks try Thread.checkCancel(); const dir_path = try windows.GetFinalPathNameByHandle(dir.handle, .{}, &dir_path_buffer); const path_len_bytes = std.math.cast(u16, dir_path.len * 2) orelse return error.NameTooLong; @@ -13898,15 +14490,15 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp .pid = pid, .err_fd = err_pipe[0], .stdin = switch (options.stdin) { - .pipe => .{ .handle = stdin_pipe[1] }, + .pipe => .{ .handle = stdin_pipe[1], .flags = .{ .nonblocking = false } }, else => null, }, .stdout = switch (options.stdout) { - .pipe => .{ .handle = stdout_pipe[0] }, + .pipe => .{ .handle = stdout_pipe[0], .flags = .{ .nonblocking = false } }, else => null, }, .stderr = switch (options.stderr) { - .pipe => .{ .handle = stderr_pipe[0] }, + .pipe => .{ .handle = stderr_pipe[0], .flags = .{ .nonblocking = false } }, else => null, }, }; @@ -14560,9 +15152,9 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro return .{ .id = piProcInfo.hProcess, .thread_handle = piProcInfo.hThread, - .stdin = if (g_hChildStd_IN_Wr) |h| .{ .handle = h } else null, - .stdout = if (g_hChildStd_OUT_Rd) |h| .{ .handle = h } else null, - .stderr = if (g_hChildStd_ERR_Rd) |h| .{ .handle = h } else null, + .stdin = if (g_hChildStd_IN_Wr) |h| .{ .handle = h, .flags = .{ .nonblocking = false } } else null, + .stdout = if (g_hChildStd_OUT_Rd) |h| .{ .handle = h, .flags = .{ .nonblocking = true } } else null, + .stderr = if (g_hChildStd_ERR_Rd) |h| .{ .handle = h, .flags = .{ .nonblocking = true } } else null, .request_resource_usage_statistics = options.request_resource_usage_statistics, }; } @@ -14607,7 +15199,7 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { t.mutex.lock(); // Another thread might have won the race. defer t.mutex.unlock(); if (t.random_file.handle) |prev_handle| { - _ = windows.ntdll.NtClose(fresh_handle); + windows.CloseHandle(fresh_handle); return prev_handle; } else { t.random_file.handle = fresh_handle; @@ -15696,6 +16288,7 @@ fn progressParentFile(userdata: ?*anyopaque) std.Progress.ParentFileError!File { .pointer => @ptrFromInt(int), else => return error.UnsupportedOperation, }, + .flags = .{ .nonblocking = false }, }; } @@ -16375,7 +16968,7 @@ const parking_sleep = struct { /// Spurious wakeups are possible. /// /// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. -fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void { +fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void { comptime assert(use_parking_futex or use_parking_sleep); switch (native_os) { .windows => { @@ -16431,6 +17024,22 @@ fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) err } } +fn deadlineToWindowsInterval(t: *Io.Threaded, deadline: Io.Clock.Timestamp) Io.Clock.Error!windows.LARGE_INTEGER { + // ntdll only supports two combinations: + // * real-time (`.real`) sleeps with absolute deadlines + // * monotonic (`.awake`/`.boot`) sleeps with relative durations + switch (deadline.clock) { + .cpu_process, .cpu_thread => unreachable, // cannot sleep for CPU time + .real => { + return @intCast(@max(@divTrunc(deadline.raw.nanoseconds, 100), 0)); + }, + .awake, .boot => { + const duration = try deadline.durationFromNow(ioBasic(t)); + return @intCast(@min(@divTrunc(-duration.raw.nanoseconds, 100), -1)); + }, + } +} + const UnparkTid = switch (native_os) { // `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles? .windows => usize, diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index ffda1e7601..593580d1f6 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -188,8 +188,8 @@ test "cancel blocked read from pipe" { }), else => { const pipe = try std.Io.Threaded.pipe2(.{}); - read_end = .{ .handle = pipe[0] }; - write_end = .{ .handle = pipe[1] }; + read_end = .{ .handle = pipe[0], .flags = .{ .nonblocking = false } }; + write_end = .{ .handle = pipe[1], .flags = .{ .nonblocking = false } }; }, } defer { diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 5ccc46778b..0fefc77a32 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -979,12 +979,13 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff if (main_parent == .unused) continue; const file: Io.File = .{ .handle = main_storage.getIpcFd() orelse continue, + .flags = .{ .nonblocking = true }, }; const opt_saved_metadata = findOld(file.handle, old_ipc_metadata_fds, old_ipc_metadata); var bytes_read: usize = 0; while (true) { const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) { - error.WouldBlock => break, + error.WouldBlock, error.EndOfStream => break, else => |e| { std.log.debug("failed to read child progress data: {t}", .{e}); main_storage.completed_count = 0; @@ -992,7 +993,6 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff continue :main_loop; }, }; - if (n == 0) break; if (opt_saved_metadata) |m| { if (m.remaining_read_trash_bytes > 0) { assert(bytes_read == 0); diff --git a/lib/std/crypto/tls/Client.zig b/lib/std/crypto/tls/Client.zig index 44a73c344a..eeeb7d0537 100644 --- a/lib/std/crypto/tls/Client.zig +++ b/lib/std/crypto/tls/Client.zig @@ -336,10 +336,11 @@ pub fn init(input: *Reader, output: *Writer, options: Options) InitError!Client // Ensure the input buffer pointer is stable in this scope. input.rebase(tls.max_ciphertext_record_len) catch |err| switch (err) { error.EndOfStream => {}, // We have assurance the remainder of stream can be buffered. + error.ReadFailed => |e| return e, }; const record_header = input.peek(tls.record_header_len) catch |err| switch (err) { error.EndOfStream => return error.TlsConnectionTruncated, - error.ReadFailed => return error.ReadFailed, + error.ReadFailed => |e| return e, }; const record_ct = input.takeEnumNonexhaustive(tls.ContentType, .big) catch unreachable; // already peeked input.toss(2); // legacy_version diff --git a/lib/std/os/windows/kernel32.zig b/lib/std/os/windows/kernel32.zig index fe28e40cbb..b6785e4a33 100644 --- a/lib/std/os/windows/kernel32.zig +++ b/lib/std/os/windows/kernel32.zig @@ -188,9 +188,6 @@ pub extern "kernel32" fn PostQueuedCompletionStatus( lpOverlapped: ?*OVERLAPPED, ) callconv(.winapi) BOOL; -// TODO: -// GetOverlappedResultEx with bAlertable=false, which calls: GetStdHandle + WaitForSingleObjectEx. -// Uses the SwitchBack system to run implementations for older programs; Do we care about this? pub extern "kernel32" fn GetOverlappedResult( hFile: HANDLE, lpOverlapped: *OVERLAPPED, diff --git a/lib/std/os/windows/ntdll.zig b/lib/std/os/windows/ntdll.zig index f61cbbf5b8..d9e68e54f9 100644 --- a/lib/std/os/windows/ntdll.zig +++ b/lib/std/os/windows/ntdll.zig @@ -594,6 +594,13 @@ pub extern "ntdll" fn NtCancelSynchronousIoFile( IoStatusBlock: *IO_STATUS_BLOCK, ) callconv(.winapi) NTSTATUS; +/// This function has been observed to return SUCCESS on timeout on Windows 10 +/// and TIMEOUT on Wine 10.0. +/// +/// This function has been observed on Windows 11 such that positive interval +/// is real time, which can cause waits to be interrupted by changing system +/// time, however negative intervals are not affected by changes to system +/// time. pub extern "ntdll" fn NtDelayExecution( Alertable: BOOLEAN, DelayInterval: *const LARGE_INTEGER, @@ -606,6 +613,6 @@ pub extern "ntdll" fn NtCancelIoFileEx( ) callconv(.winapi) NTSTATUS; pub extern "ntdll" fn NtCancelIoFile( - handle: HANDLE, - iosbToCancel: *const IO_STATUS_BLOCK, + FileHandle: HANDLE, + IoStatusBlock: *IO_STATUS_BLOCK, ) callconv(.winapi) NTSTATUS; diff --git a/lib/std/posix/test.zig b/lib/std/posix/test.zig index 5838595fcf..e2a52473f3 100644 --- a/lib/std/posix/test.zig +++ b/lib/std/posix/test.zig @@ -126,8 +126,8 @@ test "pipe" { const io = testing.io; const fds = try std.Io.Threaded.pipe2(.{}); - const out: Io.File = .{ .handle = fds[0] }; - const in: Io.File = .{ .handle = fds[1] }; + const out: Io.File = .{ .handle = fds[0], .flags = .{ .nonblocking = false } }; + const in: Io.File = .{ .handle = fds[1], .flags = .{ .nonblocking = false } }; try in.writeStreamingAll(io, "hello"); var buf: [16]u8 = undefined; try expect((try out.readStreaming(io, &.{&buf})) == 5); @@ -150,7 +150,10 @@ test "memfd_create" { else => return error.SkipZigTest, } - const file: Io.File = .{ .handle = try posix.memfd_create("test", 0) }; + const file: Io.File = .{ + .handle = try posix.memfd_create("test", 0), + .flags = .{ .nonblocking = false }, + }; defer file.close(io); try file.writePositionalAll(io, "test", 0); diff --git a/lib/std/process.zig b/lib/std/process.zig index 8395882c16..3b5a0ecebd 100644 --- a/lib/std/process.zig +++ b/lib/std/process.zig @@ -453,14 +453,16 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child { return io.vtable.processSpawnPath(io.userdata, dir, options); } -pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{ - StdoutStreamTooLong, - StderrStreamTooLong, -}; +pub const RunError = error{ + StreamTooLong, +} || SpawnError || Io.File.MultiReader.UnendingError || Io.Timeout.Error; pub const RunOptions = struct { argv: []const []const u8, - max_output_bytes: usize = 50 * 1024, + stderr_limit: Io.Limit = .unlimited, + stdout_limit: Io.Limit = .unlimited, + /// How many bytes to initially allocate for stderr and stdout. + reserve_amount: usize = 64, /// Set to change the current working directory when spawning the child process. cwd: ?[]const u8 = null, @@ -486,6 +488,7 @@ pub const RunOptions = struct { create_no_window: bool = true, /// Darwin-only. Disable ASLR for the child process. disable_aslr: bool = false, + timeout: Io.Timeout = .none, }; pub const RunResult = struct { @@ -513,22 +516,41 @@ pub fn run(gpa: Allocator, io: Io, options: RunOptions) RunError!RunResult { }); defer child.kill(io); - var stdout: std.ArrayList(u8) = .empty; - defer stdout.deinit(gpa); - var stderr: std.ArrayList(u8) = .empty; - defer stderr.deinit(gpa); + var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined; + var multi_reader: Io.File.MultiReader = undefined; + multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? }); + defer multi_reader.deinit(); - try child.collectOutput(gpa, &stdout, &stderr, options.max_output_bytes); + const stdout_reader = multi_reader.reader(0); + const stderr_reader = multi_reader.reader(1); + + while (multi_reader.fill(options.reserve_amount, options.timeout)) |_| { + if (options.stdout_limit.toInt()) |limit| { + if (stdout_reader.buffered().len > limit) + return error.StreamTooLong; + } + if (options.stderr_limit.toInt()) |limit| { + if (stderr_reader.buffered().len > limit) + return error.StreamTooLong; + } + } else |err| switch (err) { + error.EndOfStream => {}, + else => |e| return e, + } + + try multi_reader.checkAnyError(); const term = try child.wait(io); - const owned_stdout = try stdout.toOwnedSlice(gpa); - errdefer gpa.free(owned_stdout); - const owned_stderr = try stderr.toOwnedSlice(gpa); + const stdout_slice = try multi_reader.toOwnedSlice(0); + errdefer gpa.free(stdout_slice); + + const stderr_slice = try multi_reader.toOwnedSlice(1); + errdefer gpa.free(stderr_slice); return .{ - .stdout = owned_stdout, - .stderr = owned_stderr, + .stdout = stdout_slice, + .stderr = stderr_slice, .term = term, }; } diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index 17e15f208d..c87d221a95 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -9,7 +9,6 @@ const process = std.process; const File = std.Io.File; const assert = std.debug.assert; const Allocator = std.mem.Allocator; -const ArrayList = std.ArrayList; pub const Id = switch (native_os) { .windows => std.os.windows.HANDLE, @@ -125,54 +124,3 @@ pub fn wait(child: *Child, io: Io) WaitError!Term { assert(child.id != null); return io.vtable.childWait(io.userdata, child); } - -/// Collect the output from the process's stdout and stderr. Will return once all output -/// has been collected. This does not mean that the process has ended. `wait` should still -/// be called to wait for and clean up the process. -/// -/// The process must have been started with stdout and stderr set to -/// `process.SpawnOptions.StdIo.pipe`. -pub fn collectOutput( - child: *const Child, - /// Used for `stdout` and `stderr`. - allocator: Allocator, - stdout: *ArrayList(u8), - stderr: *ArrayList(u8), - max_output_bytes: usize, -) !void { - var poller = std.Io.poll(allocator, enum { stdout, stderr }, .{ - .stdout = child.stdout.?, - .stderr = child.stderr.?, - }); - defer poller.deinit(); - - const stdout_r = poller.reader(.stdout); - stdout_r.buffer = stdout.allocatedSlice(); - stdout_r.seek = 0; - stdout_r.end = stdout.items.len; - - const stderr_r = poller.reader(.stderr); - stderr_r.buffer = stderr.allocatedSlice(); - stderr_r.seek = 0; - stderr_r.end = stderr.items.len; - - defer { - stdout.* = .{ - .items = stdout_r.buffer[0..stdout_r.end], - .capacity = stdout_r.buffer.len, - }; - stderr.* = .{ - .items = stderr_r.buffer[0..stderr_r.end], - .capacity = stderr_r.buffer.len, - }; - stdout_r.buffer = &.{}; - stderr_r.buffer = &.{}; - } - - while (try poller.poll()) { - if (stdout_r.bufferedLen() > max_output_bytes) - return error.StdoutStreamTooLong; - if (stderr_r.bufferedLen() > max_output_bytes) - return error.StderrStreamTooLong; - } -} diff --git a/lib/std/process/Preopens.zig b/lib/std/process/Preopens.zig index 8223c29f83..3baf696ee9 100644 --- a/lib/std/process/Preopens.zig +++ b/lib/std/process/Preopens.zig @@ -29,7 +29,10 @@ pub fn get(p: *const Preopens, name: []const u8) ?Resource { switch (native_os) { .wasi => { const index = p.map.getIndex(name) orelse return null; - if (index <= 2) return .{ .file = .{ .handle = @intCast(index) } }; + if (index <= 2) return .{ .file = .{ + .handle = @intCast(index), + .flags = .{ .nonblocking = false }, + } }; return .{ .dir = .{ .handle = @intCast(index) } }; }, else => { diff --git a/lib/std/zig/LibCInstallation.zig b/lib/std/zig/LibCInstallation.zig index 02b3df54dc..6a3f4b4813 100644 --- a/lib/std/zig/LibCInstallation.zig +++ b/lib/std/zig/LibCInstallation.zig @@ -268,7 +268,8 @@ fn findNativeIncludeDirPosix(self: *LibCInstallation, gpa: Allocator, io: Io, ar }); const run_res = std.process.run(gpa, io, .{ - .max_output_bytes = 1024 * 1024, + .stdout_limit = .limited(1024 * 1024), + .stderr_limit = .limited(1024 * 1024), .argv = argv.items, .environ_map = &environ_map, // Some C compilers, such as Clang, are known to rely on argv[0] to find the path @@ -584,7 +585,8 @@ fn ccPrintFileName(gpa: Allocator, io: Io, args: CCPrintFileNameOptions) ![]u8 { try argv.append(arg1); const run_res = std.process.run(gpa, io, .{ - .max_output_bytes = 1024 * 1024, + .stdout_limit = .limited(1024 * 1024), + .stderr_limit = .limited(1024 * 1024), .argv = argv.items, .environ_map = &environ_map, // Some C compilers, such as Clang, are known to rely on argv[0] to find the path diff --git a/lib/std/zig/system.zig b/lib/std/zig/system.zig index 5046e2f51b..b32b554dee 100644 --- a/lib/std/zig/system.zig +++ b/lib/std/zig/system.zig @@ -420,7 +420,6 @@ pub fn resolveTargetQuery(io: Io, query: Target.Query) DetectError!Target { error.Canceled => |e| return e, error.Unexpected => |e| return e, error.WouldBlock => return error.Unexpected, - error.BrokenPipe => return error.Unexpected, error.ConnectionResetByPeer => return error.Unexpected, error.NotOpenForReading => return error.Unexpected, error.SocketUnconnected => return error.Unexpected, diff --git a/src/Compilation.zig b/src/Compilation.zig index 98c1b56e38..6b6021ab3c 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -6873,6 +6873,7 @@ fn spawnZigRc( child_progress_node: std.Progress.Node, ) !void { const io = comp.io; + const gpa = comp.gpa; var node_name: std.ArrayList(u8) = .empty; defer node_name.deinit(arena); @@ -6887,55 +6888,69 @@ fn spawnZigRc( }); defer child.kill(io); - var poller = std.Io.poll(comp.gpa, enum { stdout, stderr }, .{ - .stdout = child.stdout.?, - .stderr = child.stderr.?, - }); - defer poller.deinit(); + var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined; + var multi_reader: Io.File.MultiReader = undefined; + multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? }); + defer multi_reader.deinit(); - const stdout = poller.reader(.stdout); + const stdout = multi_reader.fileReader(0); + const MessageHeader = std.zig.Server.Message.Header; - poll: while (true) { - const MessageHeader = std.zig.Server.Message.Header; - while (stdout.buffered().len < @sizeOf(MessageHeader)) if (!try poller.poll()) break :poll; - const header = stdout.takeStruct(MessageHeader, .little) catch unreachable; - while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; - const body = stdout.take(header.bytes_len) catch unreachable; + var eos_err: error{EndOfStream}!void = {}; + while (true) { + const header = stdout.interface.takeStruct(MessageHeader, .little) catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => return stdout.err.?, + }; + const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) { + error.EndOfStream => |e| { + // Better to report the crash with stderr below, but we set + // this in case the child exits successfully while violating + // this protocol. + eos_err = e; + break; + }, + error.ReadFailed => return stdout.err.?, + }; switch (header.tag) { // We expect exactly one ErrorBundle, and if any error_bundle header is // sent then it's a fatal error. .error_bundle => { - const error_bundle = try std.zig.Server.allocErrorBundle(comp.gpa, body); + const error_bundle = try std.zig.Server.allocErrorBundle(gpa, body); return comp.failWin32ResourceWithOwnedBundle(win32_resource, error_bundle); }, else => {}, // ignore other messages } } - // Just in case there's a failure that didn't send an ErrorBundle (e.g. an error return trace) - const stderr = poller.reader(.stderr); + try multi_reader.fillRemaining(.none); + // Just in case there's a failure that didn't send an ErrorBundle (e.g. an error return trace) const term = child.wait(io) catch |err| { return comp.failWin32Resource(win32_resource, "unable to wait for {s} rc: {t}", .{ argv[0], err }); }; + const stderr = multi_reader.reader(1).buffered(); + switch (term) { .exited => |code| { if (code != 0) { - log.err("zig rc failed with stderr:\n{s}", .{stderr.buffered()}); + log.err("zig rc failed with stderr:\n{s}", .{stderr}); return comp.failWin32Resource(win32_resource, "zig rc exited with code {d}", .{code}); } }, .signal => |sig| { - log.err("zig rc signaled {t} with stderr:\n{s}", .{ sig, stderr.buffered() }); + log.err("zig rc signaled {t} with stderr:\n{s}", .{ sig, stderr }); return comp.failWin32Resource(win32_resource, "zig rc terminated unexpectedly", .{}); }, else => { - log.err("zig rc terminated with stderr:\n{s}", .{stderr.buffered()}); + log.err("zig rc terminated with stderr:\n{s}", .{stderr}); return comp.failWin32Resource(win32_resource, "zig rc terminated unexpectedly", .{}); }, } + + try eos_err; } pub fn tmpFilePath(comp: Compilation, ally: Allocator, suffix: []const u8) error{OutOfMemory}![]const u8 { diff --git a/src/codegen/c/Type.zig b/src/codegen/c/Type.zig index fb37b60580..0bcdb207fc 100644 --- a/src/codegen/c/Type.zig +++ b/src/codegen/c/Type.zig @@ -2389,7 +2389,7 @@ pub const Pool = struct { .nonstring = elem_ctype.isAnyChar() and switch (ptr_info.sentinel) { .none => true, .zero_u8 => false, - else => |sentinel| Value.fromInterned(sentinel).orderAgainstZero(zcu).compare(.neq), + else => |sentinel| !Value.fromInterned(sentinel).compareAllWithZero(.eq, zcu), }, }); }, @@ -2438,7 +2438,7 @@ pub const Pool = struct { .nonstring = elem_ctype.isAnyChar() and switch (array_info.sentinel) { .none => true, .zero_u8 => false, - else => |sentinel| Value.fromInterned(sentinel).orderAgainstZero(zcu).compare(.neq), + else => |sentinel| !Value.fromInterned(sentinel).compareAllWithZero(.eq, zcu), }, }); if (!kind.isParameter()) return array_ctype; diff --git a/src/link.zig b/src/link.zig index 6f19ec0e58..3af768a363 100644 --- a/src/link.zig +++ b/src/link.zig @@ -605,8 +605,8 @@ pub const File = struct { switch (base.tag) { .lld => assert(base.file == null), .elf, .macho, .wasm => { - if (base.file != null) return; dev.checkAny(&.{ .coff_linker, .elf_linker, .macho_linker, .plan9_linker, .wasm_linker }); + if (base.file != null) return; const emit = base.emit; if (base.child_pid) |pid| { if (builtin.os.tag == .windows) { @@ -645,6 +645,7 @@ pub const File = struct { base.file = try emit.root_dir.handle.openFile(io, emit.sub_path, .{ .mode = .read_write }); }, .elf2, .coff2 => if (base.file == null) { + dev.checkAny(&.{ .elf2_linker, .coff2_linker }); const mf = if (base.cast(.elf2)) |elf| &elf.mf else if (base.cast(.coff2)) |coff| @@ -657,7 +658,13 @@ pub const File = struct { base.file = mf.memory_map.file; try mf.ensureTotalCapacity(@intCast(mf.nodes.items[0].location().resolve(mf)[1])); }, - .c, .spirv => dev.checkAny(&.{ .c_linker, .spirv_linker }), + .c => if (base.file == null) { + dev.check(.c_linker); + base.file = try base.emit.root_dir.handle.openFile(io, base.emit.sub_path, .{ + .mode = .write_only, + }); + }, + .spirv => dev.check(.spirv_linker), .plan9 => unreachable, } } diff --git a/tools/doctest.zig b/tools/doctest.zig index 55b8ca7bfb..97a9a0be3f 100644 --- a/tools/doctest.zig +++ b/tools/doctest.zig @@ -201,7 +201,6 @@ fn printOutput( .argv = build_args.items, .cwd = tmp_dir_path, .environ_map = environ_map, - .max_output_bytes = max_doc_file_size, }); switch (result.term) { .exited => |exit_code| { @@ -257,7 +256,6 @@ fn printOutput( .argv = run_args, .environ_map = environ_map, .cwd = tmp_dir_path, - .max_output_bytes = max_doc_file_size, }); switch (result.term) { .exited => |exit_code| { @@ -376,7 +374,6 @@ fn printOutput( .argv = test_args.items, .environ_map = environ_map, .cwd = tmp_dir_path, - .max_output_bytes = max_doc_file_size, }); switch (result.term) { .exited => |exit_code| { @@ -432,7 +429,6 @@ fn printOutput( .argv = test_args.items, .environ_map = environ_map, .cwd = tmp_dir_path, - .max_output_bytes = max_doc_file_size, }); switch (result.term) { .exited => |exit_code| { @@ -508,7 +504,6 @@ fn printOutput( .argv = build_args.items, .environ_map = environ_map, .cwd = tmp_dir_path, - .max_output_bytes = max_doc_file_size, }); switch (result.term) { .exited => |exit_code| { @@ -1132,7 +1127,6 @@ fn run( .argv = args, .environ_map = environ_map, .cwd = cwd, - .max_output_bytes = max_doc_file_size, }); switch (result.term) { .exited => |exit_code| { diff --git a/tools/incr-check.zig b/tools/incr-check.zig index c9564f85c2..840faf6f27 100644 --- a/tools/incr-check.zig +++ b/tools/incr-check.zig @@ -28,6 +28,7 @@ fn logImpl( } pub fn main(init: std.process.Init) !void { + const gpa = init.gpa; const fatal = std.process.fatal; const arena = init.arena.allocator(); const io = init.io; @@ -224,11 +225,10 @@ pub fn main(init: std.process.Init) !void { .enable_darling = enable_darling, }; - var poller = Io.poll(arena, Eval.StreamEnum, .{ - .stdout = child.stdout.?, - .stderr = child.stderr.?, - }); - defer poller.deinit(); + var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined; + var multi_reader: Io.File.MultiReader = undefined; + multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ child.stdout.?, child.stderr.? }); + defer multi_reader.deinit(); for (case.updates) |update| { var update_node = target_prog_node.start(update.name, 0); @@ -243,10 +243,10 @@ pub fn main(init: std.process.Init) !void { eval.write(update); try eval.requestUpdate(); - try eval.check(&poller, update, update_node); + try eval.check(&multi_reader, update, update_node); } - try eval.end(&poller); + try eval.end(&multi_reader); waitChild(&child, &eval); } @@ -272,9 +272,6 @@ const Eval = struct { enable_wasmtime: bool, enable_darling: bool, - const StreamEnum = enum { stdout, stderr }; - const Poller = Io.Poller(StreamEnum); - /// Currently this function assumes the previous updates have already been written. fn write(eval: *Eval, update: Case.Update) void { const io = eval.io; @@ -293,23 +290,29 @@ const Eval = struct { } } - fn check(eval: *Eval, poller: *Poller, update: Case.Update, prog_node: std.Progress.Node) !void { + fn check(eval: *Eval, mr: *Io.File.MultiReader, update: Case.Update, prog_node: std.Progress.Node) !void { const arena = eval.arena; - const stdout = poller.reader(.stdout); - const stderr = poller.reader(.stderr); + const stdout = mr.fileReader(0); + const stderr = &mr.fileReader(1).interface; + const Header = std.zig.Server.Message.Header; - poll: while (true) { - const Header = std.zig.Server.Message.Header; - while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll; - const header = stdout.takeStruct(Header, .little) catch unreachable; - while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; - const body = stdout.take(header.bytes_len) catch unreachable; + while (true) { + const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => return stdout.err.?, + }; + const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) { + // If this panic triggers it might be helpful to rework this + // code to print the stderr from the abnormally terminated child. + error.EndOfStream => @panic("unexpected mid-message end of stream"), + error.ReadFailed => return stdout.err.?, + }; switch (header.tag) { .error_bundle => { const result_error_bundle = try std.zig.Server.allocErrorBundle(arena, body); if (stderr.bufferedLen() > 0) { - const stderr_data = try poller.toOwnedSlice(.stderr); + const stderr_data = try mr.toOwnedSlice(1); if (eval.allow_stderr) { std.log.info("error_bundle stderr:\n{s}", .{stderr_data}); } else { @@ -326,7 +329,7 @@ const Eval = struct { var r: std.Io.Reader = .fixed(body); _ = r.takeStruct(std.zig.Server.Message.EmitDigest, .little) catch unreachable; if (stderr.bufferedLen() > 0) { - const stderr_data = try poller.toOwnedSlice(.stderr); + const stderr_data = try mr.toOwnedSlice(1); if (eval.allow_stderr) { std.log.info("emit_digest stderr:\n{s}", .{stderr_data}); } else { @@ -358,11 +361,12 @@ const Eval = struct { } } - if (stderr.bufferedLen() > 0) { + const buffered_stderr = stderr.buffered(); + if (buffered_stderr.len > 0) { if (eval.allow_stderr) { - std.log.info("stderr:\n{s}", .{stderr.buffered()}); + std.log.info("stderr:\n{s}", .{buffered_stderr}); } else { - eval.fatal("unexpected stderr:\n{s}", .{stderr.buffered()}); + eval.fatal("unexpected stderr:\n{s}", .{buffered_stderr}); } } @@ -588,23 +592,27 @@ const Eval = struct { }; } - fn end(eval: *Eval, poller: *Poller) !void { + fn end(eval: *Eval, mr: *Io.File.MultiReader) !void { requestExit(eval.child, eval); - const stdout = poller.reader(.stdout); - const stderr = poller.reader(.stderr); + const stdout = mr.fileReader(0); + const Header = std.zig.Server.Message.Header; - poll: while (true) { - const Header = std.zig.Server.Message.Header; - while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll; - const header = stdout.takeStruct(Header, .little) catch unreachable; - while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; - stdout.toss(header.bytes_len); + while (true) { + const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => return stdout.err.?, + }; + stdout.interface.discardAll(header.bytes_len) catch |err| switch (err) { + error.ReadFailed => return stdout.err.?, + error.EndOfStream => |e| return e, + }; } - if (stderr.bufferedLen() > 0) { - eval.fatal("unexpected stderr:\n{s}", .{stderr.buffered()}); - } + try mr.fillRemaining(.none); + + const stderr = mr.reader(1).buffered(); + if (stderr.len > 0) eval.fatal("unexpected stderr:\n{s}", .{stderr}); } fn buildCOutput(eval: *Eval, c_path: []const u8, out_path: []const u8, prog_node: std.Progress.Node) !void { diff --git a/tools/update_clang_options.zig b/tools/update_clang_options.zig index b52267a3fb..a073062d88 100644 --- a/tools/update_clang_options.zig +++ b/tools/update_clang_options.zig @@ -676,7 +676,6 @@ pub fn main(init: std.process.Init) !void { const child_result = try std.process.run(arena, io, .{ .argv = &child_args, - .max_output_bytes = 100 * 1024 * 1024, }); std.debug.print("{s}\n", .{child_result.stderr}); diff --git a/tools/update_cpu_features.zig b/tools/update_cpu_features.zig index 3041ee6acc..eaa6a9afd2 100644 --- a/tools/update_cpu_features.zig +++ b/tools/update_cpu_features.zig @@ -1987,7 +1987,6 @@ fn processOneTarget(io: Io, job: Job) void { const child_result = try std.process.run(arena, io, .{ .argv = &child_args, - .max_output_bytes = 500 * 1024 * 1024, }); tblgen_progress.end(); if (child_result.stderr.len != 0) {