std: remove error.BrokenPipe from file reads, add error.EndOfStream

and make reading file streaming allowed to return 0 byte reads.
According to Microsoft documentation, on Windows it is possible to get
0-byte reads from pipes when 0-byte writes are made.
This commit is contained in:
Andrew Kelley
2026-01-27 15:31:23 -08:00
parent 46b5eb6ec3
commit f612d5ae96
6 changed files with 63 additions and 76 deletions
+4 -7
View File
@@ -264,10 +264,7 @@ pub fn Poller(comptime StreamEnum: type) type {
// always check if there's some data waiting to be read first.
if (poll_fd.revents & posix.POLL.IN != 0) {
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
error.BrokenPipe => 0, // Handle the same as EOF.
else => |e| return e,
};
const amt = try posix.read(poll_fd.fd, buf);
advanceBufferEnd(r, amt);
if (amt == 0) {
// Remove the fd when the EOF condition is met.
@@ -633,9 +630,9 @@ pub const VTable = struct {
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
/// Returns 0 on end of stream.
fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.Reader.Error!usize,
/// Returns 0 on end of stream.
/// May return 0 reads which is different than `error.EndOfStream`.
fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.ReadStreamingError!usize,
/// Returns 0 if reading at or past the end.
fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize,
fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
+3 -2
View File
@@ -550,11 +550,13 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void {
});
}
pub const ReadStreamingError = error{EndOfStream} || Reader.Error;
/// Returns 0 on stream end or if `buffer` has no space available for data.
///
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usize {
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize {
return io.vtable.fileReadStreaming(io.userdata, file, buffer);
}
@@ -563,7 +565,6 @@ pub const ReadPositionalError = error{
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,
+18 -15
View File
@@ -31,7 +31,6 @@ pub const Error = error{
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
ConnectionResetByPeer,
/// File was not opened with read capability.
NotOpenForReading,
@@ -300,14 +299,16 @@ fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, r.file, dest) catch |err| {
r.err = err;
return error.ReadFailed;
const n = io.vtable.fileReadStreaming(io.userdata, r.file, dest) catch |err| switch (err) {
error.EndOfStream => {
r.size = r.pos;
return error.EndOfStream;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
@@ -355,14 +356,16 @@ fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, &data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, file, dest) catch |err| {
r.err = err;
return error.ReadFailed;
const n = io.vtable.fileReadStreaming(io.userdata, file, dest) catch |err| switch (err) {
error.EndOfStream => {
r.size = r.pos;
return error.EndOfStream;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
+37 -49
View File
@@ -8139,14 +8139,14 @@ fn fileClose(userdata: ?*anyopaque, files: []const File) void {
for (files) |file| posix.close(file.handle);
}
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.ReadStreamingError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
if (is_windows) return fileReadStreamingWindows(file, data);
return fileReadStreamingPosix(file, data);
}
fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingError!usize {
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
@@ -8167,28 +8167,24 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => {
syscall.finish();
if (nread == 0) return error.EndOfStream;
return nread;
},
.INTR, .TIMEDOUT => {
try syscall.checkCancel();
continue;
},
else => |e| {
syscall.finish();
switch (e) {
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.BADF => return error.IsDir, // File operation on directory.
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
},
.BADF => return syscall.fail(error.IsDir), // File operation on directory.
.IO => return syscall.fail(error.InputOutput),
.ISDIR => return syscall.fail(error.IsDir),
.NOBUFS => return syscall.fail(error.SystemResources),
.NOMEM => return syscall.fail(error.SystemResources),
.NOTCONN => return syscall.fail(error.SocketUnconnected),
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.NOTCAPABLE => return syscall.fail(error.AccessDenied),
.INVAL => |err| return syscall.errnoBug(err),
.FAULT => |err| return syscall.errnoBug(err),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
@@ -8199,36 +8195,33 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
switch (posix.errno(rc)) {
.SUCCESS => {
syscall.finish();
if (rc == 0) return error.EndOfStream;
return @intCast(rc);
},
.INTR, .TIMEDOUT => {
try syscall.checkCancel();
continue;
},
else => |e| {
.BADF => {
syscall.finish();
switch (e) {
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => return error.WouldBlock,
.BADF => {
if (native_os == .wasi) return error.IsDir; // File operation on directory.
return error.NotOpenForReading;
},
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
else => |err| return posix.unexpectedErrno(err),
}
if (native_os == .wasi) return error.IsDir; // File operation on directory.
return error.NotOpenForReading;
},
.AGAIN => return syscall.fail(error.WouldBlock),
.IO => return syscall.fail(error.InputOutput),
.ISDIR => return syscall.fail(error.IsDir),
.NOBUFS => return syscall.fail(error.SystemResources),
.NOMEM => return syscall.fail(error.SystemResources),
.NOTCONN => return syscall.fail(error.SocketUnconnected),
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.INVAL => |err| return syscall.errnoBug(err),
.FAULT => |err| return syscall.errnoBug(err),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
if (index == data.len) return 0;
@@ -8250,25 +8243,18 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!us
null, // key
)) {
.SUCCESS => {
// Only END_OF_FILE is the true end.
if (io_status_block.Information == 0) {
try syscall.checkCancel();
continue;
} else {
syscall.finish();
io_status_block.u.Status = .SUCCESS;
return io_status_block.Information;
}
},
.END_OF_FILE, .PIPE_BROKEN => {
syscall.finish();
return io_status_block.Information;
},
.PENDING => break,
.END_OF_FILE, .PIPE_BROKEN => {
syscall.finish();
return error.EndOfStream;
},
.CANCELLED => {
try syscall.checkCancel();
continue;
},
.PENDING => break,
.INVALID_DEVICE_REQUEST => return syscall.fail(error.IsDir),
.LOCK_NOT_GRANTED => return syscall.fail(error.LockViolation),
.ACCESS_DENIED => return syscall.fail(error.AccessDenied),
@@ -8302,7 +8288,9 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!us
alertable_syscall.finish();
}
switch (io_status_block.u.Status) {
.SUCCESS, .END_OF_FILE, .PIPE_BROKEN => return io_status_block.Information,
.SUCCESS => return io_status_block.Information,
.END_OF_FILE => return error.EndOfStream,
.PIPE_BROKEN => return error.EndOfStream,
.PENDING => unreachable, // cannot return until the operation completes
.INVALID_DEVICE_REQUEST => return error.IsDir,
.LOCK_NOT_GRANTED => return error.LockViolation,
+1 -2
View File
@@ -984,7 +984,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
var bytes_read: usize = 0;
while (true) {
const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) {
error.WouldBlock => break,
error.WouldBlock, error.EndOfStream => break,
else => |e| {
std.log.debug("failed to read child progress data: {t}", .{e});
main_storage.completed_count = 0;
@@ -992,7 +992,6 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
continue :main_loop;
},
};
if (n == 0) break;
if (opt_saved_metadata) |m| {
if (m.remaining_read_trash_bytes > 0) {
assert(bytes_read == 0);
-1
View File
@@ -418,7 +418,6 @@ pub fn resolveTargetQuery(io: Io, query: Target.Query) DetectError!Target {
error.Canceled => |e| return e,
error.Unexpected => |e| return e,
error.WouldBlock => return error.Unexpected,
error.BrokenPipe => return error.Unexpected,
error.ConnectionResetByPeer => return error.Unexpected,
error.NotOpenForReading => return error.Unexpected,
error.SocketUnconnected => return error.Unexpected,