std.Io: move fileWriteStreaming to Operation

This serves as an example to contributors of how to move VTable
functions to becoming an Operation, thereby enabling Batch API and
timeouts.
This commit is contained in:
Andrew Kelley
2026-01-30 22:00:45 -08:00
parent 9bd648bd40
commit cc442d24ab
5 changed files with 217 additions and 71 deletions
+35 -1
View File
@@ -184,7 +184,6 @@ pub const VTable = struct {
fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat,
fileLength: *const fn (?*anyopaque, File) File.LengthError!u64,
fileClose: *const fn (?*anyopaque, []const File) void,
fileWriteStreaming: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize) File.Writer.Error!usize,
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
@@ -257,6 +256,7 @@ pub const VTable = struct {
pub const Operation = union(enum) {
file_read_streaming: FileReadStreaming,
file_write_streaming: FileWriteStreaming,
pub const Tag = @typeInfo(Operation).@"union".tag_type.?;
@@ -290,6 +290,40 @@ pub const Operation = union(enum) {
pub const Result = Error!usize;
};
pub const FileWriteStreaming = struct {
file: File,
header: []const u8 = &.{},
data: []const []const u8,
splat: usize = 1,
pub const Error = error{
DiskQuota,
FileTooBig,
InputOutput,
NoSpaceLeft,
DeviceBusy,
/// File descriptor does not hold the required rights to write to it.
AccessDenied,
PermissionDenied,
/// File is an unconnected socket, or closed its read end.
BrokenPipe,
/// Insufficient kernel memory to read from in_fd.
SystemResources,
NotOpenForWriting,
/// The process cannot access the file because another process has locked
/// a portion of the file. Windows-only.
LockViolation,
/// Non-blocking has been enabled and this operation would block.
WouldBlock,
/// This error occurs when a device gets disconnected before or mid-flush
/// while it's being written to - errno(6): No such device or address.
NoDevice,
FileBusy,
} || Io.UnexpectedError;
pub const Result = Error!usize;
};
pub const Result = Result: {
const operation_fields = @typeInfo(Operation).@"union".fields;
var field_names: [operation_fields.len][]const u8 = undefined;
+16 -5
View File
@@ -572,16 +572,16 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void {
pub const ReadStreamingError = error{EndOfStream} || Reader.Error;
/// Returns 0 on stream end or if `buffer` has no space available for data.
/// May return fewer bytes than buffer space available, including 0.
/// End-of-stream is indicated by `error.EndOfStream`.
///
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize {
const result = try io.operate(.{ .file_read_streaming = .{
return (try io.operate(.{ .file_read_streaming = .{
.file = file,
.data = buffer,
} });
return result.file_read_streaming;
} })).file_read_streaming;
}
pub const ReadPositionalError = error{
@@ -714,11 +714,22 @@ pub fn writerStreaming(file: File, io: Io, buffer: []u8) Writer {
return .initStreaming(file, io, buffer);
}
/// This is a low-level API that calls the `Io` interface function directly.
/// For a higher level API, see `writerStreaming`.
pub fn writeStreaming(file: File, io: Io, header: []const u8, data: []const []const u8, splat: usize) Writer.Error!usize {
return (try io.operate(.{ .file_write_streaming = .{
.file = file,
.header = header,
.data = data,
.splat = splat,
} })).file_write_streaming;
}
/// Equivalent to creating a streaming writer, writing `bytes`, and then flushing.
pub fn writeStreamingAll(file: File, io: Io, bytes: []const u8) Writer.Error!void {
var index: usize = 0;
while (index < bytes.len) {
index += try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{bytes[index..]}, 1);
index += try writeStreaming(file, io, &.{}, &.{bytes[index..]}, 1);
}
}
+2 -25
View File
@@ -20,30 +20,7 @@ interface: Io.Writer,
pub const Mode = File.Reader.Mode;
pub const Error = error{
DiskQuota,
FileTooBig,
InputOutput,
NoSpaceLeft,
DeviceBusy,
/// File descriptor does not hold the required rights to write to it.
AccessDenied,
PermissionDenied,
/// File is an unconnected socket, or closed its read end.
BrokenPipe,
/// Insufficient kernel memory to read from in_fd.
SystemResources,
NotOpenForWriting,
/// The process cannot access the file because another process has locked
/// a portion of the file. Windows-only.
LockViolation,
/// Non-blocking has been enabled and this operation would block.
WouldBlock,
/// This error occurs when a device gets disconnected before or mid-flush
/// while it's being written to - errno(6): No such device or address.
NoDevice,
FileBusy,
} || Io.Cancelable || Io.UnexpectedError;
pub const Error = Io.Operation.FileWriteStreaming.Error || Io.Cancelable;
pub const WriteFileError = Error || error{
/// Descriptor is not valid or locked, or an mmap(2)-like operation is not available for in_fd.
@@ -146,7 +123,7 @@ fn drainPositional(w: *Writer, data: []const []const u8, splat: usize) Io.Writer
fn drainStreaming(w: *Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
const io = w.io;
const header = w.interface.buffered();
const n = io.vtable.fileWriteStreaming(io.userdata, w.file, header, data, splat) catch |err| {
const n = w.file.writeStreaming(io, header, data, splat) catch |err| {
w.err = err;
return error.WriteFailed;
};
+162 -38
View File
@@ -1649,7 +1649,6 @@ pub fn io(t: *Threaded) Io {
.fileStat = fileStat,
.fileLength = fileLength,
.fileClose = fileClose,
.fileWriteStreaming = fileWriteStreaming,
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
@@ -1813,7 +1812,6 @@ pub fn ioBasic(t: *Threaded) Io {
.fileStat = fileStat,
.fileLength = fileLength,
.fileClose = fileClose,
.fileWriteStreaming = fileWriteStreaming,
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
@@ -2496,6 +2494,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
else => |e| e,
},
},
.file_write_streaming => |o| return .{
.file_write_streaming = fileWriteStreaming(t, o.file, o.header, o.data, o.splat) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
},
},
}
}
@@ -2523,6 +2527,10 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
poll_len += 1;
},
.file_write_streaming => |o| {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.OUT, .revents = 0 };
poll_len += 1;
},
}
index = submission.node.next;
}
@@ -2687,6 +2695,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN),
.file_write_streaming => |o| try poll_storage.add(o.file, posix.POLL.OUT),
}
index = submission.node.next;
}
@@ -2864,6 +2873,7 @@ fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows
b.completions.tail = .fromIndex(index);
const result: Io.Operation.Result = switch (pending.tag) {
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
.file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) },
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
@@ -2950,6 +2960,66 @@ fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, Concurren
else => |status| {
syscall.finish();
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
break;
},
};
}
},
.file_write_streaming => |o| o: {
const buffer = windowsWriteBuffer(o.header, o.data, o.splat);
if (buffer.len == 0) {
context.iosb = .{
.u = .{ .Status = .SUCCESS },
.Information = 0,
};
batchApc(b, &context.iosb, 0);
break :o;
}
if (o.file.flags.nonblocking) {
context.file = o.file.handle;
switch (windows.ntdll.NtWriteFile(
o.file.handle,
null, // event
&batchApc,
b,
&context.iosb,
buffer.ptr,
@intCast(buffer.len),
null, // byte offset
null, // key
)) {
.PENDING, .SUCCESS => {},
.CANCELLED => unreachable,
else => |status| {
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
},
}
} else {
if (concurrency) return error.ConcurrencyUnavailable;
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtWriteFile(
o.file.handle,
null, // event
null, // APC routine
null, // APC context
&context.iosb,
buffer.ptr,
@intCast(buffer.len),
null, // byte offset
null, // key
)) {
.PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
.CANCELLED => {
try syscall.checkCancel();
continue;
},
else => |status| {
syscall.finish();
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
break;
@@ -2963,6 +3033,21 @@ fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, Concurren
b.submissions = .{ .head = .none, .tail = .none };
}
/// Since Windows only supports writing one contiguous buffer, returns the
/// first one, while also limiting it to a length representable by 32-bit
/// unsigned integer.
fn windowsWriteBuffer(header: []const u8, data: []const []const u8, splat: usize) []const u8 {
const buffer = b: {
if (header.len != 0) break :b header;
for (data[0 .. data.len - 1]) |buffer| {
if (buffer.len != 0) break :b buffer;
}
if (splat == 0) return &.{};
break :b data[data.len - 1];
};
return buffer[0..@min(buffer.len, std.math.maxInt(u32))];
}
fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void {
const ct = complete_tail.*;
const len: u31 = @intCast(ring.len);
@@ -9005,6 +9090,24 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
}
}
fn ntWriteFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
switch (io_status_block.u.Status) {
.PENDING => unreachable,
.CANCELLED => unreachable,
.SUCCESS => return io_status_block.Information,
.INVALID_USER_BUFFER => return error.SystemResources,
.NO_MEMORY => return error.SystemResources,
.QUOTA_EXCEEDED => return error.SystemResources,
.PIPE_BROKEN => return error.BrokenPipe,
.INVALID_HANDLE => return error.NotOpenForWriting,
.LOCK_NOT_GRANTED => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.WORKING_SET_QUOTA => return error.SystemResources,
.DISK_FULL => return error.NoSpaceLeft,
else => |status| return windows.unexpectedStatus(status),
}
}
fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)");
@@ -9837,16 +9940,9 @@ fn fileWriteStreaming(
_ = t;
if (is_windows) {
if (header.len != 0) {
return writeFileStreamingWindows(file.handle, header);
}
for (data[0 .. data.len - 1]) |buf| {
if (buf.len == 0) continue;
return writeFileStreamingWindows(file.handle, buf);
}
const pattern = data[data.len - 1];
if (pattern.len == 0 or splat == 0) return 0;
return writeFileStreamingWindows(file.handle, pattern);
const buffer = windowsWriteBuffer(header, data, splat);
if (buffer.len == 0) return 0;
return fileWriteStreamingWindows(file, buffer);
}
var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
@@ -9953,38 +10049,66 @@ fn fileWriteStreaming(
}
}
fn writeFileStreamingWindows(
handle: windows.HANDLE,
bytes: []const u8,
) File.Writer.Error!usize {
assert(bytes.len != 0);
var bytes_written: windows.DWORD = undefined;
const adjusted_len = std.math.lossyCast(u32, bytes.len);
const syscall: Syscall = try .start();
while (true) {
if (windows.kernel32.WriteFile(handle, bytes.ptr, adjusted_len, &bytes_written, null) != 0) {
syscall.finish();
return bytes_written;
fn fileWriteStreamingWindows(file: File, buffer: []const u8) File.Writer.Error!usize {
assert(buffer.len != 0);
var iosb: windows.IO_STATUS_BLOCK = undefined;
if (file.flags.nonblocking) {
var done: bool = false;
switch (windows.ntdll.NtWriteFile(
file.handle,
null, // event
flagApc,
&done, // APC context
&iosb,
buffer.ptr,
@intCast(buffer.len),
null, // byte offset
null, // key
)) {
// We must wait for the APC routine.
.PENDING, .SUCCESS => while (!done) {
// Once we get here we must not return from the function until the
// operation completes, thereby releasing reference to io_status_block.
const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
error.Canceled => |e| {
var cancel_iosb: windows.IO_STATUS_BLOCK = undefined;
_ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb);
while (!done) waitForApcOrAlert();
return e;
},
};
waitForApcOrAlert();
alertable_syscall.finish();
},
else => |status| iosb.u.Status = status,
}
switch (windows.GetLastError()) {
.OPERATION_ABORTED => {
return ntWriteFileResult(&iosb);
} else {
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtWriteFile(
file.handle,
null, // event
null, // APC routine
null, // APC context
&iosb,
buffer.ptr,
@intCast(buffer.len),
null, // byte offset
null, // key
)) {
.PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
.CANCELLED => {
try syscall.checkCancel();
continue;
},
.INVALID_USER_BUFFER => return syscall.fail(error.SystemResources),
.NOT_ENOUGH_MEMORY => return syscall.fail(error.SystemResources),
.NOT_ENOUGH_QUOTA => return syscall.fail(error.SystemResources),
.NO_DATA => return syscall.fail(error.BrokenPipe),
.INVALID_HANDLE => return syscall.fail(error.NotOpenForWriting),
.LOCK_VIOLATION => return syscall.fail(error.LockViolation),
.ACCESS_DENIED => return syscall.fail(error.AccessDenied),
.WORKING_SET_QUOTA => return syscall.fail(error.SystemResources),
.DISK_FULL => return syscall.fail(error.NoSpaceLeft),
else => |err| {
else => |status| {
syscall.finish();
return windows.unexpectedError(err);
iosb.u.Status = status;
return ntWriteFileResult(&iosb);
},
}
};
}
}
+2 -2
View File
@@ -1437,7 +1437,7 @@ fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!voi
// We do this in a separate write call to give a better chance for the
// writev below to be in a single packet.
const n = @min(parents.len, remaining_write_trash_bytes);
if (io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{parents[0..n]}, 1)) |written| {
if (file.writeStreaming(io, &.{}, &.{parents[0..n]}, 1)) |written| {
remaining_write_trash_bytes -= written;
continue;
} else |err| switch (err) {
@@ -1478,7 +1478,7 @@ fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error
return total_written) : (iov_index += 1) written -= iov[iov_index].len;
iov[iov_index].ptr += written;
iov[iov_index].len -= written;
written = try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, iov, 1);
written = try file.writeStreaming(io, &.{}, iov, 1);
if (written == 0) return total_written;
total_written += written;
}