diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 6dc0e24731..506203418f 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -15,463 +15,13 @@ const Io = @This(); const builtin = @import("builtin"); -const is_windows = builtin.os.tag == .windows; const std = @import("std.zig"); -const windows = std.os.windows; -const posix = std.posix; const math = std.math; const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; -pub fn poll( - gpa: Allocator, - comptime StreamEnum: type, - files: PollFiles(StreamEnum), -) Poller(StreamEnum) { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - var result: Poller(StreamEnum) = .{ - .gpa = gpa, - .readers = @splat(.failing), - .poll_fds = undefined, - .windows = if (is_windows) .{ - .first_read_done = false, - .overlapped = [1]windows.OVERLAPPED{ - std.mem.zeroes(windows.OVERLAPPED), - } ** enum_fields.len, - .small_bufs = undefined, - .active = .{ - .count = 0, - .handles_buf = undefined, - .stream_map = undefined, - }, - } else {}, - }; - - inline for (enum_fields, 0..) |field, i| { - if (is_windows) { - result.windows.active.handles_buf[i] = @field(files, field.name).handle; - } else { - result.poll_fds[i] = .{ - .fd = @field(files, field.name).handle, - .events = posix.POLL.IN, - .revents = undefined, - }; - } - } - - return result; -} - -pub fn Poller(comptime StreamEnum: type) type { - return struct { - const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - const PollFd = if (is_windows) void else posix.pollfd; - - gpa: Allocator, - readers: [enum_fields.len]Reader, - poll_fds: [enum_fields.len]PollFd, - windows: if (is_windows) struct { - first_read_done: bool, - overlapped: [enum_fields.len]windows.OVERLAPPED, - small_bufs: [enum_fields.len][128]u8, - active: struct { - count: math.IntFittingRange(0, enum_fields.len), - handles_buf: [enum_fields.len]windows.HANDLE, - stream_map: [enum_fields.len]StreamEnum, - - pub fn removeAt(self: *@This(), index: u32) void { - assert(index < self.count); - for (index + 1..self.count) |i| { - self.handles_buf[i - 1] = self.handles_buf[i]; - self.stream_map[i - 1] = self.stream_map[i]; - } - self.count -= 1; - } - }, - } else void, - - const Self = @This(); - - pub fn deinit(self: *Self) void { - const gpa = self.gpa; - if (is_windows) { - // cancel any pending IO to prevent clobbering OVERLAPPED value - for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { - _ = windows.kernel32.CancelIo(h); - } - } - inline for (&self.readers) |*r| gpa.free(r.buffer); - self.* = undefined; - } - - pub fn poll(self: *Self) !bool { - if (is_windows) { - return pollWindows(self, null); - } else { - return pollPosix(self, null); - } - } - - pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool { - if (is_windows) { - return pollWindows(self, nanoseconds); - } else { - return pollPosix(self, nanoseconds); - } - } - - pub fn reader(self: *Self, which: StreamEnum) *Reader { - return &self.readers[@intFromEnum(which)]; - } - - pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 { - const gpa = self.gpa; - const r = reader(self, which); - if (r.seek == 0) { - const new = try gpa.realloc(r.buffer, r.end); - r.buffer = &.{}; - r.end = 0; - return new; - } - const new = try gpa.dupe(u8, r.buffered()); - gpa.free(r.buffer); - r.buffer = &.{}; - r.seek = 0; - r.end = 0; - return new; - } - - fn pollWindows(self: *Self, nanoseconds: ?u64) !bool { - const bump_amt = 512; - const gpa = self.gpa; - - if (!self.windows.first_read_done) { - var already_read_data = false; - for (0..enum_fields.len) |i| { - const handle = self.windows.active.handles_buf[i]; - switch (try windowsAsyncReadToFifoAndQueueSmallRead( - gpa, - handle, - &self.windows.overlapped[i], - &self.readers[i], - &self.windows.small_bufs[i], - bump_amt, - )) { - .populated, .empty => |state| { - if (state == .populated) already_read_data = true; - self.windows.active.handles_buf[self.windows.active.count] = handle; - self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i)); - self.windows.active.count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .closed_populated => { - // don't add to the wait_objects list, but we did already get data - already_read_data = true; - }, - } - } - self.windows.first_read_done = true; - if (already_read_data) return true; - } - - while (true) { - if (self.windows.active.count == 0) return false; - - const status = windows.kernel32.WaitForMultipleObjects( - self.windows.active.count, - &self.windows.active.handles_buf, - 0, - if (nanoseconds) |ns| - @min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1) - else - windows.INFINITE, - ); - if (status == windows.WAIT_FAILED) - return windows.unexpectedError(windows.GetLastError()); - if (status == windows.WAIT_TIMEOUT) - return true; - - if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1) - unreachable; - - const active_idx = status - windows.WAIT_OBJECT_0; - - const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]); - const handle = self.windows.active.handles_buf[active_idx]; - - const overlapped = &self.windows.overlapped[stream_idx]; - const stream_reader = &self.readers[stream_idx]; - const small_buf = &self.windows.small_bufs[stream_idx]; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => { - self.windows.active.removeAt(active_idx); - continue; - }, - .aborted => unreachable, - }; - const buf = small_buf[0..num_bytes_read]; - const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len); - @memcpy(dest[0..buf.len], buf); - advanceBufferEnd(stream_reader, buf.len); - - switch (try windowsAsyncReadToFifoAndQueueSmallRead( - gpa, - handle, - overlapped, - stream_reader, - small_buf, - bump_amt, - )) { - .empty => {}, // irrelevant, we already got data from the small buffer - .populated => {}, - .closed, - .closed_populated, // identical, since we already got data from the small buffer - => self.windows.active.removeAt(active_idx), - } - return true; - } - } - - fn pollPosix(self: *Self, nanoseconds: ?u64) !bool { - const gpa = self.gpa; - // We ask for ensureUnusedCapacity with this much extra space. This - // has more of an effect on small reads because once the reads - // start to get larger the amount of space an ArrayList will - // allocate grows exponentially. - const bump_amt = 512; - - const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP; - - const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns| - std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32) - else - -1); - if (events_len == 0) { - for (self.poll_fds) |poll_fd| { - if (poll_fd.fd != -1) return true; - } else return false; - } - - var keep_polling = false; - for (&self.poll_fds, &self.readers) |*poll_fd, *r| { - // Try reading whatever is available before checking the error - // conditions. - // It's still possible to read after a POLL.HUP is received, - // always check if there's some data waiting to be read first. - if (poll_fd.revents & posix.POLL.IN != 0) { - const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt); - const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) { - error.BrokenPipe => 0, // Handle the same as EOF. - else => |e| return e, - }; - advanceBufferEnd(r, amt); - if (amt == 0) { - // Remove the fd when the EOF condition is met. - poll_fd.fd = -1; - } else { - keep_polling = true; - } - } else if (poll_fd.revents & err_mask != 0) { - // Exclude the fds that signaled an error. - poll_fd.fd = -1; - } else if (poll_fd.fd != -1) { - keep_polling = true; - } - } - return keep_polling; - } - - /// Returns a slice into the unused capacity of `buffer` with at least - /// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary. - /// - /// After calling this function, typically the caller will follow up with a - /// call to `advanceBufferEnd` to report the actual number of bytes buffered. - fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 { - { - const unused = r.buffer[r.end..]; - if (unused.len >= min_len) return unused; - } - if (r.seek > 0) { - const data = r.buffer[r.seek..r.end]; - @memmove(r.buffer[0..data.len], data); - r.seek = 0; - r.end = data.len; - } - { - var list: std.ArrayList(u8) = .{ - .items = r.buffer[0..r.end], - .capacity = r.buffer.len, - }; - defer r.buffer = list.allocatedSlice(); - try list.ensureUnusedCapacity(allocator, min_len); - } - const unused = r.buffer[r.end..]; - assert(unused.len >= min_len); - return unused; - } - - /// After writing directly into the unused capacity of `buffer`, this function - /// updates `end` so that users of `Reader` can receive the data. - fn advanceBufferEnd(r: *Reader, n: usize) void { - assert(n <= r.buffer.len - r.end); - r.end += n; - } - - /// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful - /// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For - /// compatibility, we point it to this dummy variables, which we never otherwise access. - /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile - var win_dummy_bytes_read: u32 = undefined; - - /// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before - /// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data - /// is available. `handle` must have no pending asynchronous operation. - fn windowsAsyncReadToFifoAndQueueSmallRead( - gpa: Allocator, - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - r: *Reader, - small_buf: *[128]u8, - bump_amt: usize, - ) !enum { empty, populated, closed_populated, closed } { - var read_any_data = false; - while (true) { - const fifo_read_pending = while (true) { - const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt); - const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); - - if (0 == windows.kernel32.ReadFile( - handle, - buf.ptr, - buf_len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => break true, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - - read_any_data = true; - advanceBufferEnd(r, num_bytes_read); - - if (num_bytes_read == buf_len) { - // We filled the buffer, so there's probably more data available. - continue; - } else { - // We didn't fill the buffer, so assume we're out of data. - // There is no pending read. - break false; - } - }; - - if (fifo_read_pending) cancel_read: { - // Cancel the pending read into the FIFO. - _ = windows.kernel32.CancelIo(handle); - - // We have to wait for the handle to be signalled, i.e. for the cancelation to complete. - switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { - windows.WAIT_OBJECT_0 => {}, - windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), - else => unreachable, - } - - // If it completed before we canceled, make sure to tell the FIFO! - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => break :cancel_read, - }; - read_any_data = true; - advanceBufferEnd(r, num_bytes_read); - } - - // Try to queue the 1-byte read. - if (0 == windows.kernel32.ReadFile( - handle, - small_buf, - small_buf.len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => { - // 1-byte read pending as intended - return if (read_any_data) .populated else .empty; - }, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - // We got data back this time. Write it to the FIFO and run the main loop again. - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - const buf = small_buf[0..num_bytes_read]; - const dest = try writableSliceGreedyAlloc(r, gpa, buf.len); - @memcpy(dest[0..buf.len], buf); - advanceBufferEnd(r, buf.len); - read_any_data = true; - } - } - - /// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation. - /// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected). - /// - /// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the - /// operation immediately returns data: - /// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially - /// erroneous results." - /// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...] - /// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to - /// get the actual number of bytes read." - /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile - fn windowsGetReadResult( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - allow_aborted: bool, - ) !union(enum) { - success: u32, - closed, - aborted, - } { - var num_bytes_read: u32 = undefined; - if (0 == windows.kernel32.GetOverlappedResult( - handle, - overlapped, - &num_bytes_read, - 0, - )) switch (windows.GetLastError()) { - .BROKEN_PIPE => return .closed, - .OPERATION_ABORTED => |err| if (allow_aborted) { - return .aborted; - } else { - return windows.unexpectedError(err); - }, - else => |err| return windows.unexpectedError(err), - }; - return .{ .success = num_bytes_read }; - } - }; -} - -/// Given an enum, returns a struct with fields of that enum, each field -/// representing an I/O stream for polling. -pub fn PollFiles(comptime StreamEnum: type) type { - return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(Io.File), &@splat(.{})); -} - userdata: ?*anyopaque, vtable: *const VTable, @@ -704,18 +254,18 @@ pub const VTable = struct { pub const Limit = enum(usize) { nothing = 0, - unlimited = std.math.maxInt(usize), + unlimited = math.maxInt(usize), _, - /// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`. + /// `math.maxInt(usize)` is interpreted to mean `.unlimited`. pub fn limited(n: usize) Limit { return @enumFromInt(n); } - /// Any value grater than `std.math.maxInt(usize)` is interpreted to mean + /// Any value grater than `math.maxInt(usize)` is interpreted to mean /// `.unlimited`. pub fn limited64(n: u64) Limit { - return @enumFromInt(@min(n, std.math.maxInt(usize))); + return @enumFromInt(@min(n, math.maxInt(usize))); } pub fn countVec(data: []const []const u8) Limit { @@ -929,9 +479,9 @@ pub const Clock = enum { }; } - pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool { + pub fn compare(lhs: Clock.Timestamp, op: math.CompareOperator, rhs: Clock.Timestamp) bool { assert(lhs.clock == rhs.clock); - return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds); + return math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds); } }; @@ -996,7 +546,7 @@ pub const Duration = struct { nanoseconds: i96, pub const zero: Duration = .{ .nanoseconds = 0 }; - pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) }; + pub const max: Duration = .{ .nanoseconds = math.maxInt(i96) }; pub fn fromNanoseconds(x: i96) Duration { return .{ .nanoseconds = x }; @@ -1652,7 +1202,7 @@ pub const Event = enum(u32) { pub fn set(e: *Event, io: Io) void { switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) { .unset, .is_set => {}, - .waiting => io.futexWake(Event, e, std.math.maxInt(u32)), + .waiting => io.futexWake(Event, e, math.maxInt(u32)), } }