std.Io.Threaded: improve posix process creation

* cache /dev/null after opening
* make opening /dev/null cancelable
* avoid unreachable even when OS does something unexpected
This commit is contained in:
Andrew Kelley
2026-01-03 19:04:28 -08:00
parent 2c22c3dabf
commit fa315b1060
5 changed files with 157 additions and 119 deletions
+154 -42
View File
@@ -64,7 +64,7 @@ stderr_writer_initialized: bool = false,
argv0: Argv0,
environ: Environ,
nul_handle: if (is_windows) ?windows.HANDLE else void = if (is_windows) null else {},
null_file: NullFile = .{},
pub const Argv0 = switch (native_os) {
.openbsd, .haiku => struct {
@@ -123,6 +123,34 @@ const Environ = struct {
};
};
pub const NullFile = switch (native_os) {
.windows => struct {
handle: ?windows.HANDLE = null,
fn deinit(this: *@This()) void {
if (this.handle) |handle| {
windows.CloseHandle(handle);
this.handle = null;
}
}
},
.wasi, .ios, .tvos, .visionos, .watchos => struct {
fn deinit(this: @This()) void {
_ = this;
}
},
else => struct {
fd: posix.fd_t = -1,
fn deinit(this: *@This()) void {
if (this.fd >= 0) {
posix.close(this.fd);
this.fd = -1;
}
}
},
};
pub const Pid = if (native_os == .linux) enum(posix.pid_t) {
unknown = 0,
_,
@@ -1249,18 +1277,14 @@ pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
pub fn deinit(t: *Threaded) void {
t.join();
if (is_windows) {
if (t.wsa.status == .initialized) {
if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected();
}
if (t.nul_handle) |handle| {
windows.CloseHandle(handle);
}
if (is_windows and t.wsa.status == .initialized) {
if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected();
}
if (posix.Sigaction != void and t.have_signal_handler) {
if (have_sig_io) posix.sigaction(.IO, &t.old_sig_io, null);
if (have_sig_pipe) posix.sigaction(.PIPE, &t.old_sig_pipe, null);
}
t.null_file.deinit();
t.* = undefined;
}
@@ -1429,9 +1453,9 @@ pub fn io(t: *Threaded) Io {
.unlockStderr = unlockStderr,
.processSetCurrentDir = processSetCurrentDir,
.processReplace = processReplace,
.processReplacePath = processReplacePath, // TODO audit for cancelation and unreachable
.processSpawn = processSpawn, // TODO audit for cancelation and unreachable
.processSpawnPath = processSpawnPath, // TODO audit for cancelation and unreachable
.processReplacePath = processReplacePath,
.processSpawn = processSpawn,
.processSpawnPath = processSpawnPath,
.childWait = childWait, // TODO audit for cancelation and unreachable
.childKill = childKill, // TODO audit for cancelation and unreachable
@@ -1656,6 +1680,7 @@ const have_wait4 = switch (native_os) {
else => false,
};
const open_sym = if (posix.lfs64_abi) posix.system.open64 else posix.system.open;
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat;
@@ -12856,51 +12881,30 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
// turns out, we `dup2` everything anyway, so there's no need!
const pipe_flags: posix.O = .{ .CLOEXEC = true };
const stdin_pipe = if (options.stdin == .pipe) try posix.pipe2(pipe_flags) else undefined;
const stdin_pipe = if (options.stdin == .pipe) try pipe2(pipe_flags) else undefined;
errdefer if (options.stdin == .pipe) {
destroyPipe(stdin_pipe);
};
const stdout_pipe = if (options.stdout == .pipe) try posix.pipe2(pipe_flags) else undefined;
const stdout_pipe = if (options.stdout == .pipe) try pipe2(pipe_flags) else undefined;
errdefer if (options.stdout == .pipe) {
destroyPipe(stdout_pipe);
};
const stderr_pipe = if (options.stderr == .pipe) try posix.pipe2(pipe_flags) else undefined;
const stderr_pipe = if (options.stderr == .pipe) try pipe2(pipe_flags) else undefined;
errdefer if (options.stderr == .pipe) {
destroyPipe(stderr_pipe);
};
const any_ignore = (options.stdin == .ignore or options.stdout == .ignore or options.stderr == .ignore);
// TODO: cache file handle of /dev/null!
const dev_null_fd = if (any_ignore)
posix.openZ("/dev/null", .{ .ACCMODE = .RDWR }, 0) catch |err| switch (err) {
error.PathAlreadyExists => unreachable,
error.NoSpaceLeft => unreachable,
error.FileTooBig => unreachable,
error.DeviceBusy => unreachable,
error.FileLocksUnsupported => unreachable,
error.BadPathName => unreachable, // Windows-only
error.WouldBlock => unreachable,
error.NetworkNotFound => unreachable, // Windows-only
error.Canceled => unreachable, // temporarily in the posix error set
error.SharingViolation => unreachable, // Windows-only
error.PipeBusy => unreachable, // not a pipe
error.AntivirusInterference => unreachable, // Windows-only
else => |e| return e,
}
else
undefined;
defer {
if (any_ignore) posix.close(dev_null_fd);
}
const dev_null_fd = if (any_ignore) try getDevNullFd(t) else undefined;
const prog_pipe: [2]posix.fd_t = p: {
if (options.progress_node.index == .none) {
break :p .{ -1, -1 };
} else {
// We use CLOEXEC for the same reason as in `pipe_flags`.
break :p try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
break :p try pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
}
};
errdefer destroyPipe(prog_pipe);
@@ -12938,13 +12942,23 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
// This pipe communicates to the parent errors in the child between `fork` and `execvpe`.
// It is closed by the child (via CLOEXEC) without writing if `execvpe` succeeds.
const err_pipe: [2]posix.fd_t = try posix.pipe2(.{ .CLOEXEC = true });
const err_pipe: [2]posix.fd_t = try pipe2(.{ .CLOEXEC = true });
errdefer destroyPipe(err_pipe);
t.scanEnviron(); // for PATH
const PATH = t.environ.string.PATH orelse default_PATH;
const pid_result = try posix.fork();
const pid_result: posix.pid_t = fork: {
const rc = posix.system.fork();
switch (posix.errno(rc)) {
.SUCCESS => break :fork @intCast(rc),
.AGAIN => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOSYS => return error.OperationUnsupported,
else => |err| return posix.unexpectedErrno(err),
}
};
if (pid_result == 0) {
// We are the child.
if (Thread.current) |current_thread| current_thread.cancel_protection = .blocked;
@@ -13030,6 +13044,45 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
};
}
fn getDevNullFd(t: *Threaded) !posix.fd_t {
{
t.mutex.lock();
defer t.mutex.unlock();
if (t.null_file.fd != -1) return t.null_file.fd;
}
const syscall: Syscall = try .start();
while (true) {
const rc = open_sym("/dev/null", .{ .ACCMODE = .RDWR }, 0);
switch (posix.errno(rc)) {
.SUCCESS => {
syscall.finish();
const fresh_fd: posix.fd_t = @intCast(rc);
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
if (t.null_file.fd != -1) {
posix.close(fresh_fd);
return t.null_file.fd;
} else {
t.null_file.fd = fresh_fd;
return fresh_fd;
}
},
.INTR => {
try syscall.checkCancel();
continue;
},
.ACCES => return syscall.fail(error.AccessDenied),
.MFILE => return syscall.fail(error.ProcessFdQuotaExceeded),
.NFILE => return syscall.fail(error.SystemFdQuotaExceeded),
.NODEV => return syscall.fail(error.NoDevice),
.NOENT => return syscall.fail(error.FileNotFound),
.NOMEM => return syscall.fail(error.SystemResources),
.PERM => return syscall.fail(error.PermissionDenied),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
fn processSpawnPosix(userdata: ?*anyopaque, options: process.SpawnOptions) process.SpawnError!process.Child {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const spawned = try spawnPosix(t, options);
@@ -13639,7 +13692,7 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
{
t.mutex.lock();
defer t.mutex.unlock();
if (t.nul_handle) |handle| return handle;
if (t.null_file.handle) |handle| return handle;
}
const device_path = [_]u16{ '\\', 'D', 'e', 'v', 'i', 'c', 'e', '\\', 'N', 'u', 'l', 'l' };
@@ -13686,11 +13739,11 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
syscall.finish();
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
if (t.nul_handle) |prev_handle| {
if (t.null_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
} else {
t.nul_handle = fresh_handle;
t.null_file.handle = fresh_handle;
return fresh_handle;
}
},
@@ -15177,3 +15230,62 @@ fn unpark(tids: []const UnparkTid, addr_hint: ?*const anyopaque) void {
else => comptime unreachable,
}
}
pub const PipeError = error{
SystemFdQuotaExceeded,
ProcessFdQuotaExceeded,
} || Io.UnexpectedError;
pub fn pipe2(flags: posix.O) PipeError![2]posix.fd_t {
var fds: [2]posix.fd_t = undefined;
if (@TypeOf(posix.system.pipe2) != void) {
switch (posix.errno(posix.system.pipe2(&fds, flags))) {
.SUCCESS => return fds,
.INVAL => |err| return errnoBug(err), // Invalid flags
.NFILE => return error.SystemFdQuotaExceeded,
.MFILE => return error.ProcessFdQuotaExceeded,
else => |err| return posix.unexpectedErrno(err),
}
}
switch (posix.errno(posix.system.pipe(&fds))) {
.SUCCESS => {},
.NFILE => return error.SystemFdQuotaExceeded,
.MFILE => return error.ProcessFdQuotaExceeded,
else => |err| return posix.unexpectedErrno(err),
}
errdefer {
posix.close(fds[0]);
posix.close(fds[1]);
}
// https://github.com/ziglang/zig/issues/18882
if (@as(u32, @bitCast(flags)) == 0) return fds;
// CLOEXEC is special, it's a file descriptor flag and must be set using
// F.SETFD.
if (flags.CLOEXEC) for (fds) |fd| {
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(u32, posix.FD_CLOEXEC)))) {
.SUCCESS => {},
else => |err| return posix.unexpectedErrno(err),
}
};
const new_flags: u32 = f: {
var new_flags = flags;
new_flags.CLOEXEC = false;
break :f @bitCast(new_flags);
};
// Set every other flag affecting the file status using F.SETFL.
if (new_flags != 0) for (fds) |fd| {
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, new_flags))) {
.SUCCESS => {},
.INVAL => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
};
return fds;
}
+1 -1
View File
@@ -187,7 +187,7 @@ test "cancel blocked read from pipe" {
.bInheritHandle = std.os.windows.FALSE,
}),
else => {
const pipe = try std.posix.pipe();
const pipe = try std.Io.Threaded.pipe2(.{});
read_end = .{ .handle = pipe[0] };
write_end = .{ .handle = pipe[1] };
},
+1 -1
View File
@@ -280,7 +280,7 @@ test "splice/read" {
var buffer_read = [_]u8{98} ** 20;
try file_src.writeStreamingAll(io, &buffer_write);
const fds = try posix.pipe();
const fds = try std.Io.Threaded.pipe2(.{});
const pipe_offset: u64 = std.math.maxInt(u64);
const sqe_splice_to_pipe = try ring.splice(0x11111111, fd_src, 0, fds[1], pipe_offset, buffer_write.len);
-74
View File
@@ -2105,80 +2105,6 @@ pub fn msync(memory: []align(page_size_min) u8, flags: i32) MSyncError!void {
}
}
pub const PipeError = error{
SystemFdQuotaExceeded,
ProcessFdQuotaExceeded,
} || UnexpectedError;
/// Creates a unidirectional data channel that can be used for interprocess communication.
pub fn pipe() PipeError![2]fd_t {
var fds: [2]fd_t = undefined;
switch (errno(system.pipe(&fds))) {
.SUCCESS => return fds,
.INVAL => unreachable, // Invalid parameters to pipe()
.FAULT => unreachable, // Invalid fds pointer
.NFILE => return error.SystemFdQuotaExceeded,
.MFILE => return error.ProcessFdQuotaExceeded,
else => |err| return unexpectedErrno(err),
}
}
pub fn pipe2(flags: O) PipeError![2]fd_t {
if (@TypeOf(system.pipe2) != void) {
var fds: [2]fd_t = undefined;
switch (errno(system.pipe2(&fds, flags))) {
.SUCCESS => return fds,
.INVAL => unreachable, // Invalid flags
.FAULT => unreachable, // Invalid fds pointer
.NFILE => return error.SystemFdQuotaExceeded,
.MFILE => return error.ProcessFdQuotaExceeded,
else => |err| return unexpectedErrno(err),
}
}
const fds: [2]fd_t = try pipe();
errdefer {
close(fds[0]);
close(fds[1]);
}
// https://github.com/ziglang/zig/issues/18882
if (@as(u32, @bitCast(flags)) == 0)
return fds;
// CLOEXEC is special, it's a file descriptor flag and must be set using
// F.SETFD.
if (flags.CLOEXEC) {
for (fds) |fd| {
switch (errno(system.fcntl(fd, F.SETFD, @as(u32, FD_CLOEXEC)))) {
.SUCCESS => {},
.INVAL => unreachable, // Invalid flags
.BADF => unreachable, // Always a race condition
else => |err| return unexpectedErrno(err),
}
}
}
const new_flags: u32 = f: {
var new_flags = flags;
new_flags.CLOEXEC = false;
break :f @bitCast(new_flags);
};
// Set every other flag affecting the file status using F.SETFL.
if (new_flags != 0) {
for (fds) |fd| {
switch (errno(system.fcntl(fd, F.SETFL, new_flags))) {
.SUCCESS => {},
.INVAL => unreachable, // Invalid flags
.BADF => unreachable, // Always a race condition
else => |err| return unexpectedErrno(err),
}
}
}
return fds;
}
pub const SysCtlError = error{
PermissionDenied,
SystemResources,
+1 -1
View File
@@ -131,7 +131,7 @@ test "pipe" {
if (native_os == .windows or native_os == .wasi)
return error.SkipZigTest;
const fds = try posix.pipe();
const fds = try std.Io.Threaded.pipe2(.{});
try expect((try posix.write(fds[1], "hello")) == 5);
var buf: [16]u8 = undefined;
try expect((try posix.read(fds[0], buf[0..])) == 5);