diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index c74286f61b..a2d678275a 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1721,7 +1721,6 @@ fn evalZigTest( // a crash of some kind. Either way, the child will terminate by itself -- wait for it. const stderr_reader = multi_reader.reader(1); const stderr_owned = try arena.dupe(u8, stderr_reader.buffered()); - stderr_reader.tossBuffered(); // Clean up everything and wait for the child to exit. child.stdin.?.close(io); diff --git a/lib/std/Io.zig b/lib/std/Io.zig index a63a89e4ee..438fb01529 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -350,6 +350,8 @@ pub const Batch = struct { } }; + /// After calling this, it is safe to unconditionally defer a call to + /// `cancel`. pub fn init(operations: []Operation, ring: []u32) Batch { const len: u31 = @intCast(operations.len); assert(ring.len == len); @@ -408,12 +410,18 @@ pub const Batch = struct { /// Starts work on any submitted operations and returns when at least one has completeed. /// /// Returns `error.Timeout` if `timeout` expires first. + /// + /// Depending on the `Io` implementation, may allocate resources that are + /// freed with `cancel`, even if an error is returned. pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void { return io.vtable.batchWait(io.userdata, b, timeout); } /// Returns after all `operations` have completed. Operations which have not completed /// after this function returns were successfully dropped and had no side effects. + /// + /// This function is idempotent with respect to itself and `wait`. It is + /// safe to unconditionally `defer` a call to this function after `init`. pub fn cancel(b: *Batch, io: Io) void { return io.vtable.batchCancel(io.userdata, b); } diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index e60959abd0..0a17e58a67 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1255,6 +1255,32 @@ const AlertableSyscall = struct { assert(is_windows); } + fn start() Io.Cancelable!AlertableSyscall { + const thread = Thread.current orelse return .{ .thread = null }; + switch (thread.cancel_protection) { + .blocked => return .{ .thread = null }, + .unblocked => {}, + } + const old_status = thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b010), + .awaitable = .null, + }, .monotonic); + switch (old_status.cancelation) { + .parked => unreachable, + .blocked => unreachable, + .blocked_alertable => unreachable, + .blocked_canceling => unreachable, + .blocked_alertable_canceling => unreachable, + .none => return .{ .thread = thread }, // new status is `.blocked_alertable` + .canceling => { + // Status is unchanged (still `.canceling`)---change to `.canceled` before return. + thread.status.store(.{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic); + return error.Canceled; + }, + .canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged) + } + } + fn checkCancel(s: AlertableSyscall) Io.Cancelable!void { comptime assert(is_windows); const thread = s.thread orelse return; @@ -2501,10 +2527,10 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch. const op = ring[submit_head.index(len)]; const operation = &operations[op]; switch (operation.*) { - .noop => { - try operate(t, operation); - ring[complete_tail.index(len)] = op; - complete_tail = complete_tail.next(len); + .noop => |*o| { + _ = o.status.unstarted; + o.status = .{ .result = {} }; + submitComplete(ring, &complete_tail, op); }, .file_read_streaming => |*o| { _ = o.status.unstarted; @@ -2524,8 +2550,7 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch. 1 => if (timeout == .none) { const op = map_buffer[0]; try operate(t, &operations[op]); - ring[complete_tail.index(len)] = op; - complete_tail = complete_tail.next(len); + submitComplete(ring, &complete_tail, op); poll_i = 0; return; }, @@ -2560,8 +2585,7 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch. ring[submit_head.index(len)] = op; } else { try operate(t, &operations[op]); - ring[complete_tail.index(len)] = op; - complete_tail = complete_tail.next(len); + submitComplete(ring, &complete_tail, op); } } return; @@ -2584,19 +2608,49 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { const op = ring[submit_head.index(len)]; switch (operations[op]) { - .noop => { - operate(t, &operations[op]) catch unreachable; - ring[complete_tail.index(len)] = op; - complete_tail = complete_tail.next(len); + .noop => |*o| { + _ = o.status.unstarted; + o.status = .{ .result = {} }; + submitComplete(ring, &complete_tail, op); }, .file_read_streaming => |*o| _ = o.status.unstarted, } } + if (is_windows) { + // Iterate over pending and issue cancelations, then free the allocation for IO_STATUS_BLOCK + if (b.impl.reserved) |reserved| { + const gpa = t.allocator; + const metadatas_ptr: [*]WinOpMetadata = @ptrCast(@alignCast(reserved)); + const metadatas = metadatas_ptr[0..b.operations.len]; + for (metadatas, 0..) |*metadata, op| { + const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING; + if (done) continue; + switch (operations[op]) { + .noop => unreachable, + .file_read_streaming => |*o| { + _ = windows.ntdll.NtCancelIoFile(o.file.handle, &metadata.iosb); + }, + } + } + for (metadatas) |*metadata| { + while (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) { + waitForApcOrAlert(); + } + } + gpa.free(metadatas); + b.impl.reserved = null; + } + } b.impl.submit_head = submit_tail; b.impl.complete_tail = complete_tail; b.user.complete_tail = complete_tail; } +const WinOpMetadata = struct { + iosb: windows.IO_STATUS_BLOCK, + pending: bool, +}; + fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void { const operations = b.operations; const len: u31 = @intCast(operations.len); @@ -2606,16 +2660,16 @@ fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.Wa b.impl.submit_tail = submit_tail; var complete_tail = b.impl.complete_tail; - var overlapped_buffer: [poll_buffer_len]windows.OVERLAPPED = undefined; - var handles_buffer: [poll_buffer_len]windows.HANDLE = undefined; - var map_buffer: [poll_buffer_len]u32 = undefined; // handles_buffer index to operations index - var buffer_i: usize = 0; + const metadatas_ptr: [*]WinOpMetadata = if (b.impl.reserved) |reserved| @ptrCast(@alignCast(reserved)) else a: { + const gpa = t.allocator; + const metadatas = gpa.alloc(WinOpMetadata, operations.len) catch return error.ConcurrencyUnavailable; + b.impl.reserved = metadatas.ptr; + @memset(metadatas, .{ .iosb = undefined, .pending = false }); + break :a metadatas.ptr; + }; + const metadatas = metadatas_ptr[0..operations.len]; defer { - for (map_buffer[0..buffer_i]) |op| { - submit_head = submit_head.prev(len); - ring[submit_head.index(len)] = op; - } b.impl.submit_head = submit_head; b.impl.complete_tail = complete_tail; b.user.complete_tail = complete_tail; @@ -2624,74 +2678,76 @@ fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.Wa while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { const op = ring[submit_head.index(len)]; const operation = &operations[op]; + const metadata = &metadatas[op]; + metadata.* = .{ .iosb = undefined, .pending = false }; switch (operation.*) { - .noop => { - try operate(t, operation); - ring[complete_tail.index(len)] = op; - complete_tail = complete_tail.next(len); + .noop => |*o| { + _ = o.status.unstarted; + o.status = .{ .result = {} }; + submitComplete(ring, &complete_tail, op); }, .file_read_streaming => |*o| { _ = o.status.unstarted; - if (handles_buffer.len - buffer_i == 0) return error.ConcurrencyUnavailable; - const overlapped = &overlapped_buffer[buffer_i]; - overlapped.* = .{ - .Internal = 0, - .InternalHigh = 0, - .DUMMYUNIONNAME = .{ .Pointer = null }, - .hEvent = null, - }; - var n: windows.DWORD = undefined; - const buf = o.data[0]; - const buf_len = std.math.lossyCast(windows.DWORD, buf.len); - if (windows.kernel32.ReadFile(o.file.handle, buf.ptr, buf_len, &n, overlapped) == 0) { - @panic("TODO"); + switch (try ntReadFile(o.file.handle, o.data, &metadata.iosb)) { + .status => { + o.status = .{ .result = ntReadFileResult(&metadata.iosb) }; + submitComplete(ring, &complete_tail, op); + }, + .pending => { + o.status = .{ .pending = b }; + metadata.pending = true; + }, } - handles_buffer[buffer_i] = o.file.handle; - map_buffer[buffer_i] = op; - buffer_i += 1; }, } } - switch (buffer_i) { - 0 => return, - 1 => if (timeout == .none) { - const op = map_buffer[0]; - try operate(t, &operations[op]); - ring[complete_tail.index(len)] = op; - complete_tail = complete_tail.next(len); - buffer_i = 0; - return; - }, - else => {}, - } + var delay_interval: windows.LARGE_INTEGER = timeoutToWindowsInterval(timeout); - const handles = handles_buffer[0..buffer_i]; - const map = map_buffer[0..buffer_i]; - - const syscall: Syscall = try .start(); - const index_result = windows.WaitForMultipleObjectsEx(handles, false, windows.INFINITE, true); - syscall.finish(); - const index = index_result catch |err| switch (err) { - error.Unexpected => @panic("TODO"), - error.WaitAbandoned => @panic("TODO"), - error.WaitTimeOut => @panic("TODO"), - }; - var n: windows.DWORD = undefined; - if (0 == windows.kernel32.GetOverlappedResult(handles[index], &overlapped_buffer[index], &n, 0)) { - switch (windows.GetLastError()) { - .BROKEN_PIPE => @panic("TODO"), - .OPERATION_ABORTED => @panic("TODO"), - else => @panic("TODO"), + while (true) { + const alertable_syscall = try AlertableSyscall.start(); + const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval); + alertable_syscall.finish(); + switch (delay_rc) { + .SUCCESS => { + // The thread woke due to the timeout. Although spurious + // timeouts are OK, when no deadline is passed we must not + // return `error.Timeout`. + if (timeout != .none) return error.Timeout; + }, + else => {}, } - } else switch (operations[map[index]]) { - .noop => unreachable, - .file_read_streaming => |*o| { - o.status = .{ .result = n }; - }, + var any_done = false; + var any_pending = false; + for (metadatas, 0..) |*metadata, op_usize| { + if (!metadata.pending) continue; + any_pending = true; + const op: u31 = @intCast(op_usize); + const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING; + switch (operations[op]) { + .noop => unreachable, + .file_read_streaming => |*o| { + assert(o.status.pending == b); + if (!done) continue; + o.status = .{ .result = ntReadFileResult(&metadata.iosb) }; + }, + } + any_done = true; + metadata.pending = false; + submitComplete(ring, &complete_tail, op); + } + if (any_done) return; + if (!any_pending) return; } } +fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void { + const ct = complete_tail.*; + const len: u31 = @intCast(ring.len); + ring[ct.index(len)] = op; + complete_tail.* = ct.next(len); +} + const dirCreateDir = switch (native_os) { .windows => dirCreateDirWindows, .wasi => dirCreateDirWasi, @@ -5529,7 +5585,7 @@ fn dirRealPathFileWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, fn realPathWindows(h_file: windows.HANDLE, out_buffer: []u8) File.RealPathError!usize { var wide_buf: [windows.PATH_MAX_WIDE]u16 = undefined; - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks try Thread.checkCancel(); const wide_slice = try windows.GetFinalPathNameByHandle(h_file, .{}, &wide_buf); @@ -8617,70 +8673,42 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz } fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize { - var index: usize = 0; - while (index < data.len and data[index].len == 0) index += 1; - if (index == data.len) return 0; - const buffer = data[index]; - var io_status_block: windows.IO_STATUS_BLOCK = undefined; - const syscall: Syscall = try .start(); - while (true) { - io_status_block.u.Status = .PENDING; - switch (windows.ntdll.NtReadFile( - file.handle, - null, // event - noopApc, // apc callback - null, // apc context - &io_status_block, - buffer.ptr, - @min(std.math.maxInt(u32), buffer.len), - null, // byte offset - null, // key - )) { - .SUCCESS, .END_OF_FILE, .PIPE_BROKEN => { - syscall.finish(); - return io_status_block.Information; - }, - .PENDING => break, - .CANCELLED => { - try syscall.checkCancel(); - continue; - }, - .INVALID_DEVICE_REQUEST => return syscall.fail(error.IsDir), - .LOCK_NOT_GRANTED => return syscall.fail(error.LockViolation), - .ACCESS_DENIED => return syscall.fail(error.AccessDenied), - .INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), // streaming read of async mode file - else => |status| return syscall.unexpectedNtstatus(status), - } - } - { - // Once we get here we received PENDING so we must not return from the - // function until the operation completes. - defer while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { - waitForApcOrAlert(); - }; + if (ntReadFile(file.handle, data, &io_status_block)) |result| switch (result) { + .status => return ntReadFileResult(&io_status_block), + .pending => { + // Once we get here we received PENDING so we must not return from the + // function until the operation completes. + defer while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { + waitForApcOrAlert(); + }; - const alertable_syscall = syscall.toAlertable() catch |err| switch (err) { - error.Canceled => |e| { - _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block); - return e; - }, - }; - defer alertable_syscall.finish(); - waitForApcOrAlert(); - while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { - alertable_syscall.checkCancel() catch |err| switch (err) { + const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) { error.Canceled => |e| { _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block); return e; }, }; + defer alertable_syscall.finish(); waitForApcOrAlert(); - } - } + while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { + alertable_syscall.checkCancel() catch |err| switch (err) { + error.Canceled => |e| { + _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block); + return e; + }, + }; + waitForApcOrAlert(); + } + }, + } else |err| return err; + return ntReadFileResult(&io_status_block); +} + +fn ntReadFileResult(io_status_block: *windows.IO_STATUS_BLOCK) !usize { switch (io_status_block.u.Status) { .SUCCESS, .END_OF_FILE, .PIPE_BROKEN => return io_status_block.Information, - .PENDING => unreachable, // cannot return until the operation completes + .PENDING => unreachable, .INVALID_DEVICE_REQUEST => return error.IsDir, .LOCK_NOT_GRANTED => return error.LockViolation, .ACCESS_DENIED => return error.AccessDenied, @@ -8688,6 +8716,47 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!us } } +fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STATUS_BLOCK) Io.Cancelable!enum { status, pending } { + var index: usize = 0; + while (index < data.len and data[index].len == 0) index += 1; + if (index == data.len) { + iosb.u.Status = .SUCCESS; + iosb.Information = 0; + return .status; + } + const buffer = data[index]; + + const syscall: Syscall = try .start(); + while (true) { + iosb.u.Status = .PENDING; + switch (windows.ntdll.NtReadFile( + handle, + null, // event + noopApc, // apc callback + null, // apc context + iosb, + buffer.ptr, + @min(std.math.maxInt(u32), buffer.len), + null, // byte offset + null, // key + )) { + .PENDING => { + syscall.finish(); + return .pending; + }, + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |status| { + syscall.finish(); + iosb.u.Status = status; + return .status; + }, + } + } +} + fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize { if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)"); @@ -9318,7 +9387,7 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) process.Execut }; defer w.CloseHandle(h_file); - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks try Thread.checkCancel(); const wide_slice = try w.GetFinalPathNameByHandle(h_file, .{}, &path_name_w_buf.data); @@ -12989,7 +13058,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr if (is_windows) { var dir_path_buffer: [windows.PATH_MAX_WIDE]u16 = undefined; - // TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks + // TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks try Thread.checkCancel(); const dir_path = try windows.GetFinalPathNameByHandle(dir.handle, .{}, &dir_path_buffer); const path_len_bytes = std.math.cast(u16, dir_path.len * 2) orelse return error.NameTooLong; @@ -16657,7 +16726,7 @@ const parking_sleep = struct { /// Spurious wakeups are possible. /// /// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. -fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void { +fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void { comptime assert(use_parking_futex or use_parking_sleep); switch (native_os) { .windows => { @@ -16713,6 +16782,23 @@ fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) err } } +fn timeoutToWindowsInterval(timeout: Io.Timeout) windows.LARGE_INTEGER { + switch (timeout) { + .none => { + return std.math.minInt(windows.LARGE_INTEGER); // infinite timeout + }, + .deadline => |deadline| { + const nanoseconds = deadline.raw.nanoseconds; + return @intCast(@divTrunc(nanoseconds, 100)); + }, + .duration => |duration| { + const now_timestamp = nowWindows(duration.clock) catch unreachable; + const deadline_ns = now_timestamp.nanoseconds + duration.raw.nanoseconds; + return @intCast(@divTrunc(deadline_ns, 100)); + }, + } +} + const UnparkTid = switch (native_os) { // `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles? .windows => usize,