diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 74ddd1dccf..a892c0123c 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -184,7 +184,6 @@ pub const VTable = struct { fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat, fileLength: *const fn (?*anyopaque, File) File.LengthError!u64, fileClose: *const fn (?*anyopaque, []const File) void, - fileWriteStreaming: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize) File.Writer.Error!usize, fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize, fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize, fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize, @@ -257,6 +256,7 @@ pub const VTable = struct { pub const Operation = union(enum) { file_read_streaming: FileReadStreaming, + file_write_streaming: FileWriteStreaming, pub const Tag = @typeInfo(Operation).@"union".tag_type.?; @@ -290,6 +290,40 @@ pub const Operation = union(enum) { pub const Result = Error!usize; }; + pub const FileWriteStreaming = struct { + file: File, + header: []const u8 = &.{}, + data: []const []const u8, + splat: usize = 1, + + pub const Error = error{ + DiskQuota, + FileTooBig, + InputOutput, + NoSpaceLeft, + DeviceBusy, + /// File descriptor does not hold the required rights to write to it. + AccessDenied, + PermissionDenied, + /// File is an unconnected socket, or closed its read end. + BrokenPipe, + /// Insufficient kernel memory to read from in_fd. + SystemResources, + NotOpenForWriting, + /// The process cannot access the file because another process has locked + /// a portion of the file. Windows-only. + LockViolation, + /// Non-blocking has been enabled and this operation would block. + WouldBlock, + /// This error occurs when a device gets disconnected before or mid-flush + /// while it's being written to - errno(6): No such device or address. + NoDevice, + FileBusy, + } || Io.UnexpectedError; + + pub const Result = Error!usize; + }; + pub const Result = Result: { const operation_fields = @typeInfo(Operation).@"union".fields; var field_names: [operation_fields.len][]const u8 = undefined; diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index e0297e0573..ba7f5d01e0 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -572,16 +572,16 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void { pub const ReadStreamingError = error{EndOfStream} || Reader.Error; -/// Returns 0 on stream end or if `buffer` has no space available for data. +/// May return fewer bytes than buffer space available, including 0. +/// End-of-stream is indicated by `error.EndOfStream`. /// /// See also: /// * `reader` pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize { - const result = try io.operate(.{ .file_read_streaming = .{ + return (try io.operate(.{ .file_read_streaming = .{ .file = file, .data = buffer, - } }); - return result.file_read_streaming; + } })).file_read_streaming; } pub const ReadPositionalError = error{ @@ -714,11 +714,22 @@ pub fn writerStreaming(file: File, io: Io, buffer: []u8) Writer { return .initStreaming(file, io, buffer); } +/// This is a low-level API that calls the `Io` interface function directly. +/// For a higher level API, see `writerStreaming`. +pub fn writeStreaming(file: File, io: Io, header: []const u8, data: []const []const u8, splat: usize) Writer.Error!usize { + return (try io.operate(.{ .file_write_streaming = .{ + .file = file, + .header = header, + .data = data, + .splat = splat, + } })).file_write_streaming; +} + /// Equivalent to creating a streaming writer, writing `bytes`, and then flushing. pub fn writeStreamingAll(file: File, io: Io, bytes: []const u8) Writer.Error!void { var index: usize = 0; while (index < bytes.len) { - index += try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{bytes[index..]}, 1); + index += try writeStreaming(file, io, &.{}, &.{bytes[index..]}, 1); } } diff --git a/lib/std/Io/File/Writer.zig b/lib/std/Io/File/Writer.zig index 68a68e28ec..83f99d2ffc 100644 --- a/lib/std/Io/File/Writer.zig +++ b/lib/std/Io/File/Writer.zig @@ -20,30 +20,7 @@ interface: Io.Writer, pub const Mode = File.Reader.Mode; -pub const Error = error{ - DiskQuota, - FileTooBig, - InputOutput, - NoSpaceLeft, - DeviceBusy, - /// File descriptor does not hold the required rights to write to it. - AccessDenied, - PermissionDenied, - /// File is an unconnected socket, or closed its read end. - BrokenPipe, - /// Insufficient kernel memory to read from in_fd. - SystemResources, - NotOpenForWriting, - /// The process cannot access the file because another process has locked - /// a portion of the file. Windows-only. - LockViolation, - /// Non-blocking has been enabled and this operation would block. - WouldBlock, - /// This error occurs when a device gets disconnected before or mid-flush - /// while it's being written to - errno(6): No such device or address. - NoDevice, - FileBusy, -} || Io.Cancelable || Io.UnexpectedError; +pub const Error = Io.Operation.FileWriteStreaming.Error || Io.Cancelable; pub const WriteFileError = Error || error{ /// Descriptor is not valid or locked, or an mmap(2)-like operation is not available for in_fd. @@ -146,7 +123,7 @@ fn drainPositional(w: *Writer, data: []const []const u8, splat: usize) Io.Writer fn drainStreaming(w: *Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize { const io = w.io; const header = w.interface.buffered(); - const n = io.vtable.fileWriteStreaming(io.userdata, w.file, header, data, splat) catch |err| { + const n = w.file.writeStreaming(io, header, data, splat) catch |err| { w.err = err; return error.WriteFailed; }; diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 12521ea1af..8de5e2692a 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1649,7 +1649,6 @@ pub fn io(t: *Threaded) Io { .fileStat = fileStat, .fileLength = fileLength, .fileClose = fileClose, - .fileWriteStreaming = fileWriteStreaming, .fileWritePositional = fileWritePositional, .fileWriteFileStreaming = fileWriteFileStreaming, .fileWriteFilePositional = fileWriteFilePositional, @@ -1813,7 +1812,6 @@ pub fn ioBasic(t: *Threaded) Io { .fileStat = fileStat, .fileLength = fileLength, .fileClose = fileClose, - .fileWriteStreaming = fileWriteStreaming, .fileWritePositional = fileWritePositional, .fileWriteFileStreaming = fileWriteFileStreaming, .fileWriteFilePositional = fileWriteFilePositional, @@ -2496,6 +2494,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper else => |e| e, }, }, + .file_write_streaming => |o| return .{ + .file_write_streaming = fileWriteStreaming(t, o.file, o.header, o.data, o.splat) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| e, + }, + }, } } @@ -2523,6 +2527,10 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 }; poll_len += 1; }, + .file_write_streaming => |o| { + poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.OUT, .revents = 0 }; + poll_len += 1; + }, } index = submission.node.next; } @@ -2687,6 +2695,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout const submission = &b.storage[index.toIndex()].submission; switch (submission.operation) { .file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN), + .file_write_streaming => |o| try poll_storage.add(o.file, posix.POLL.OUT), } index = submission.node.next; } @@ -2864,6 +2873,7 @@ fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows b.completions.tail = .fromIndex(index); const result: Io.Operation.Result = switch (pending.tag) { .file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) }, + .file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) }, }; storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; }, @@ -2950,6 +2960,66 @@ fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, Concurren else => |status| { syscall.finish(); + context.iosb.u.Status = status; + batchApc(b, &context.iosb, 0); + break; + }, + }; + } + }, + .file_write_streaming => |o| o: { + const buffer = windowsWriteBuffer(o.header, o.data, o.splat); + if (buffer.len == 0) { + context.iosb = .{ + .u = .{ .Status = .SUCCESS }, + .Information = 0, + }; + batchApc(b, &context.iosb, 0); + break :o; + } + if (o.file.flags.nonblocking) { + context.file = o.file.handle; + switch (windows.ntdll.NtWriteFile( + o.file.handle, + null, // event + &batchApc, + b, + &context.iosb, + buffer.ptr, + @intCast(buffer.len), + null, // byte offset + null, // key + )) { + .PENDING, .SUCCESS => {}, + .CANCELLED => unreachable, + else => |status| { + context.iosb.u.Status = status; + batchApc(b, &context.iosb, 0); + }, + } + } else { + if (concurrency) return error.ConcurrencyUnavailable; + + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtWriteFile( + o.file.handle, + null, // event + null, // APC routine + null, // APC context + &context.iosb, + buffer.ptr, + @intCast(buffer.len), + null, // byte offset + null, // key + )) { + .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |status| { + syscall.finish(); + context.iosb.u.Status = status; batchApc(b, &context.iosb, 0); break; @@ -2963,6 +3033,21 @@ fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, Concurren b.submissions = .{ .head = .none, .tail = .none }; } +/// Since Windows only supports writing one contiguous buffer, returns the +/// first one, while also limiting it to a length representable by 32-bit +/// unsigned integer. +fn windowsWriteBuffer(header: []const u8, data: []const []const u8, splat: usize) []const u8 { + const buffer = b: { + if (header.len != 0) break :b header; + for (data[0 .. data.len - 1]) |buffer| { + if (buffer.len != 0) break :b buffer; + } + if (splat == 0) return &.{}; + break :b data[data.len - 1]; + }; + return buffer[0..@min(buffer.len, std.math.maxInt(u32))]; +} + fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void { const ct = complete_tail.*; const len: u31 = @intCast(ring.len); @@ -9005,6 +9090,24 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize { } } +fn ntWriteFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize { + switch (io_status_block.u.Status) { + .PENDING => unreachable, + .CANCELLED => unreachable, + .SUCCESS => return io_status_block.Information, + .INVALID_USER_BUFFER => return error.SystemResources, + .NO_MEMORY => return error.SystemResources, + .QUOTA_EXCEEDED => return error.SystemResources, + .PIPE_BROKEN => return error.BrokenPipe, + .INVALID_HANDLE => return error.NotOpenForWriting, + .LOCK_NOT_GRANTED => return error.LockViolation, + .ACCESS_DENIED => return error.AccessDenied, + .WORKING_SET_QUOTA => return error.SystemResources, + .DISK_FULL => return error.NoSpaceLeft, + else => |status| return windows.unexpectedStatus(status), + } +} + fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize { if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)"); @@ -9837,16 +9940,9 @@ fn fileWriteStreaming( _ = t; if (is_windows) { - if (header.len != 0) { - return writeFileStreamingWindows(file.handle, header); - } - for (data[0 .. data.len - 1]) |buf| { - if (buf.len == 0) continue; - return writeFileStreamingWindows(file.handle, buf); - } - const pattern = data[data.len - 1]; - if (pattern.len == 0 or splat == 0) return 0; - return writeFileStreamingWindows(file.handle, pattern); + const buffer = windowsWriteBuffer(header, data, splat); + if (buffer.len == 0) return 0; + return fileWriteStreamingWindows(file, buffer); } var iovecs: [max_iovecs_len]posix.iovec_const = undefined; @@ -9953,38 +10049,66 @@ fn fileWriteStreaming( } } -fn writeFileStreamingWindows( - handle: windows.HANDLE, - bytes: []const u8, -) File.Writer.Error!usize { - assert(bytes.len != 0); - var bytes_written: windows.DWORD = undefined; - const adjusted_len = std.math.lossyCast(u32, bytes.len); - const syscall: Syscall = try .start(); - while (true) { - if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, null) != 0) { - syscall.finish(); - return bytes_written; +fn fileWriteStreamingWindows(file: File, buffer: []const u8) File.Writer.Error!usize { + assert(buffer.len != 0); + + var iosb: windows.IO_STATUS_BLOCK = undefined; + + if (file.flags.nonblocking) { + var done: bool = false; + switch (windows.ntdll.NtWriteFile( + file.handle, + null, // event + flagApc, + &done, // APC context + &iosb, + buffer.ptr, + @intCast(buffer.len), + null, // byte offset + null, // key + )) { + // We must wait for the APC routine. + .PENDING, .SUCCESS => while (!done) { + // Once we get here we must not return from the function until the + // operation completes, thereby releasing reference to io_status_block. + const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) { + error.Canceled => |e| { + var cancel_iosb: windows.IO_STATUS_BLOCK = undefined; + _ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb); + while (!done) waitForApcOrAlert(); + return e; + }, + }; + waitForApcOrAlert(); + alertable_syscall.finish(); + }, + else => |status| iosb.u.Status = status, } - switch (windows.GetLastError()) { - .OPERATION_ABORTED => { + return ntWriteFileResult(&iosb); + } else { + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtWriteFile( + file.handle, + null, // event + null, // APC routine + null, // APC context + &iosb, + buffer.ptr, + @intCast(buffer.len), + null, // byte offset + null, // key + )) { + .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag + .CANCELLED => { try syscall.checkCancel(); continue; }, - .INVALID_USER_BUFFER => return syscall.fail(error.SystemResources), - .NOT_ENOUGH_MEMORY => return syscall.fail(error.SystemResources), - .NOT_ENOUGH_QUOTA => return syscall.fail(error.SystemResources), - .NO_DATA => return syscall.fail(error.BrokenPipe), - .INVALID_HANDLE => return syscall.fail(error.NotOpenForWriting), - .LOCK_VIOLATION => return syscall.fail(error.LockViolation), - .ACCESS_DENIED => return syscall.fail(error.AccessDenied), - .WORKING_SET_QUOTA => return syscall.fail(error.SystemResources), - .DISK_FULL => return syscall.fail(error.NoSpaceLeft), - else => |err| { + else => |status| { syscall.finish(); - return windows.unexpectedError(err); + iosb.u.Status = status; + return ntWriteFileResult(&iosb); }, - } + }; } } diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 0fefc77a32..ee2a993a4d 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -1437,7 +1437,7 @@ fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!voi // We do this in a separate write call to give a better chance for the // writev below to be in a single packet. const n = @min(parents.len, remaining_write_trash_bytes); - if (io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{parents[0..n]}, 1)) |written| { + if (file.writeStreaming(io, &.{}, &.{parents[0..n]}, 1)) |written| { remaining_write_trash_bytes -= written; continue; } else |err| switch (err) { @@ -1478,7 +1478,7 @@ fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error return total_written) : (iov_index += 1) written -= iov[iov_index].len; iov[iov_index].ptr += written; iov[iov_index].len -= written; - written = try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, iov, 1); + written = try file.writeStreaming(io, &.{}, iov, 1); if (written == 0) return total_written; total_written += written; }