diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 5fab16d429..e654afaacd 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -64,7 +64,7 @@ stderr_writer_initialized: bool = false, argv0: Argv0, environ: Environ, -nul_handle: if (is_windows) ?windows.HANDLE else void = if (is_windows) null else {}, +null_file: NullFile = .{}, pub const Argv0 = switch (native_os) { .openbsd, .haiku => struct { @@ -123,6 +123,34 @@ const Environ = struct { }; }; +pub const NullFile = switch (native_os) { + .windows => struct { + handle: ?windows.HANDLE = null, + + fn deinit(this: *@This()) void { + if (this.handle) |handle| { + windows.CloseHandle(handle); + this.handle = null; + } + } + }, + .wasi, .ios, .tvos, .visionos, .watchos => struct { + fn deinit(this: @This()) void { + _ = this; + } + }, + else => struct { + fd: posix.fd_t = -1, + + fn deinit(this: *@This()) void { + if (this.fd >= 0) { + posix.close(this.fd); + this.fd = -1; + } + } + }, +}; + pub const Pid = if (native_os == .linux) enum(posix.pid_t) { unknown = 0, _, @@ -1249,18 +1277,14 @@ pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { pub fn deinit(t: *Threaded) void { t.join(); - if (is_windows) { - if (t.wsa.status == .initialized) { - if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected(); - } - if (t.nul_handle) |handle| { - windows.CloseHandle(handle); - } + if (is_windows and t.wsa.status == .initialized) { + if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected(); } if (posix.Sigaction != void and t.have_signal_handler) { if (have_sig_io) posix.sigaction(.IO, &t.old_sig_io, null); if (have_sig_pipe) posix.sigaction(.PIPE, &t.old_sig_pipe, null); } + t.null_file.deinit(); t.* = undefined; } @@ -1429,9 +1453,9 @@ pub fn io(t: *Threaded) Io { .unlockStderr = unlockStderr, .processSetCurrentDir = processSetCurrentDir, .processReplace = processReplace, - .processReplacePath = processReplacePath, // TODO audit for cancelation and unreachable - .processSpawn = processSpawn, // TODO audit for cancelation and unreachable - .processSpawnPath = processSpawnPath, // TODO audit for cancelation and unreachable + .processReplacePath = processReplacePath, + .processSpawn = processSpawn, + .processSpawnPath = processSpawnPath, .childWait = childWait, // TODO audit for cancelation and unreachable .childKill = childKill, // TODO audit for cancelation and unreachable @@ -1656,6 +1680,7 @@ const have_wait4 = switch (native_os) { else => false, }; +const open_sym = if (posix.lfs64_abi) posix.system.open64 else posix.system.open; const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat; const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat; const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat; @@ -12856,51 +12881,30 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp // turns out, we `dup2` everything anyway, so there's no need! const pipe_flags: posix.O = .{ .CLOEXEC = true }; - const stdin_pipe = if (options.stdin == .pipe) try posix.pipe2(pipe_flags) else undefined; + const stdin_pipe = if (options.stdin == .pipe) try pipe2(pipe_flags) else undefined; errdefer if (options.stdin == .pipe) { destroyPipe(stdin_pipe); }; - const stdout_pipe = if (options.stdout == .pipe) try posix.pipe2(pipe_flags) else undefined; + const stdout_pipe = if (options.stdout == .pipe) try pipe2(pipe_flags) else undefined; errdefer if (options.stdout == .pipe) { destroyPipe(stdout_pipe); }; - const stderr_pipe = if (options.stderr == .pipe) try posix.pipe2(pipe_flags) else undefined; + const stderr_pipe = if (options.stderr == .pipe) try pipe2(pipe_flags) else undefined; errdefer if (options.stderr == .pipe) { destroyPipe(stderr_pipe); }; const any_ignore = (options.stdin == .ignore or options.stdout == .ignore or options.stderr == .ignore); - // TODO: cache file handle of /dev/null! - const dev_null_fd = if (any_ignore) - posix.openZ("/dev/null", .{ .ACCMODE = .RDWR }, 0) catch |err| switch (err) { - error.PathAlreadyExists => unreachable, - error.NoSpaceLeft => unreachable, - error.FileTooBig => unreachable, - error.DeviceBusy => unreachable, - error.FileLocksUnsupported => unreachable, - error.BadPathName => unreachable, // Windows-only - error.WouldBlock => unreachable, - error.NetworkNotFound => unreachable, // Windows-only - error.Canceled => unreachable, // temporarily in the posix error set - error.SharingViolation => unreachable, // Windows-only - error.PipeBusy => unreachable, // not a pipe - error.AntivirusInterference => unreachable, // Windows-only - else => |e| return e, - } - else - undefined; - defer { - if (any_ignore) posix.close(dev_null_fd); - } + const dev_null_fd = if (any_ignore) try getDevNullFd(t) else undefined; const prog_pipe: [2]posix.fd_t = p: { if (options.progress_node.index == .none) { break :p .{ -1, -1 }; } else { // We use CLOEXEC for the same reason as in `pipe_flags`. - break :p try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); + break :p try pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); } }; errdefer destroyPipe(prog_pipe); @@ -12938,13 +12942,23 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp // This pipe communicates to the parent errors in the child between `fork` and `execvpe`. // It is closed by the child (via CLOEXEC) without writing if `execvpe` succeeds. - const err_pipe: [2]posix.fd_t = try posix.pipe2(.{ .CLOEXEC = true }); + const err_pipe: [2]posix.fd_t = try pipe2(.{ .CLOEXEC = true }); errdefer destroyPipe(err_pipe); t.scanEnviron(); // for PATH const PATH = t.environ.string.PATH orelse default_PATH; - const pid_result = try posix.fork(); + const pid_result: posix.pid_t = fork: { + const rc = posix.system.fork(); + switch (posix.errno(rc)) { + .SUCCESS => break :fork @intCast(rc), + .AGAIN => return error.SystemResources, + .NOMEM => return error.SystemResources, + .NOSYS => return error.OperationUnsupported, + else => |err| return posix.unexpectedErrno(err), + } + }; + if (pid_result == 0) { // We are the child. if (Thread.current) |current_thread| current_thread.cancel_protection = .blocked; @@ -13030,6 +13044,45 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp }; } +fn getDevNullFd(t: *Threaded) !posix.fd_t { + { + t.mutex.lock(); + defer t.mutex.unlock(); + if (t.null_file.fd != -1) return t.null_file.fd; + } + const syscall: Syscall = try .start(); + while (true) { + const rc = open_sym("/dev/null", .{ .ACCMODE = .RDWR }, 0); + switch (posix.errno(rc)) { + .SUCCESS => { + syscall.finish(); + const fresh_fd: posix.fd_t = @intCast(rc); + t.mutex.lock(); // Another thread might have won the race. + defer t.mutex.unlock(); + if (t.null_file.fd != -1) { + posix.close(fresh_fd); + return t.null_file.fd; + } else { + t.null_file.fd = fresh_fd; + return fresh_fd; + } + }, + .INTR => { + try syscall.checkCancel(); + continue; + }, + .ACCES => return syscall.fail(error.AccessDenied), + .MFILE => return syscall.fail(error.ProcessFdQuotaExceeded), + .NFILE => return syscall.fail(error.SystemFdQuotaExceeded), + .NODEV => return syscall.fail(error.NoDevice), + .NOENT => return syscall.fail(error.FileNotFound), + .NOMEM => return syscall.fail(error.SystemResources), + .PERM => return syscall.fail(error.PermissionDenied), + else => |err| return syscall.unexpectedErrno(err), + } + } +} + fn processSpawnPosix(userdata: ?*anyopaque, options: process.SpawnOptions) process.SpawnError!process.Child { const t: *Threaded = @ptrCast(@alignCast(userdata)); const spawned = try spawnPosix(t, options); @@ -13639,7 +13692,7 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { { t.mutex.lock(); defer t.mutex.unlock(); - if (t.nul_handle) |handle| return handle; + if (t.null_file.handle) |handle| return handle; } const device_path = [_]u16{ '\\', 'D', 'e', 'v', 'i', 'c', 'e', '\\', 'N', 'u', 'l', 'l' }; @@ -13686,11 +13739,11 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { syscall.finish(); t.mutex.lock(); // Another thread might have won the race. defer t.mutex.unlock(); - if (t.nul_handle) |prev_handle| { + if (t.null_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; } else { - t.nul_handle = fresh_handle; + t.null_file.handle = fresh_handle; return fresh_handle; } }, @@ -15177,3 +15230,62 @@ fn unpark(tids: []const UnparkTid, addr_hint: ?*const anyopaque) void { else => comptime unreachable, } } + +pub const PipeError = error{ + SystemFdQuotaExceeded, + ProcessFdQuotaExceeded, +} || Io.UnexpectedError; + +pub fn pipe2(flags: posix.O) PipeError![2]posix.fd_t { + var fds: [2]posix.fd_t = undefined; + + if (@TypeOf(posix.system.pipe2) != void) { + switch (posix.errno(posix.system.pipe2(&fds, flags))) { + .SUCCESS => return fds, + .INVAL => |err| return errnoBug(err), // Invalid flags + .NFILE => return error.SystemFdQuotaExceeded, + .MFILE => return error.ProcessFdQuotaExceeded, + else => |err| return posix.unexpectedErrno(err), + } + } + + switch (posix.errno(posix.system.pipe(&fds))) { + .SUCCESS => {}, + .NFILE => return error.SystemFdQuotaExceeded, + .MFILE => return error.ProcessFdQuotaExceeded, + else => |err| return posix.unexpectedErrno(err), + } + errdefer { + posix.close(fds[0]); + posix.close(fds[1]); + } + + // https://github.com/ziglang/zig/issues/18882 + if (@as(u32, @bitCast(flags)) == 0) return fds; + + // CLOEXEC is special, it's a file descriptor flag and must be set using + // F.SETFD. + if (flags.CLOEXEC) for (fds) |fd| { + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(u32, posix.FD_CLOEXEC)))) { + .SUCCESS => {}, + else => |err| return posix.unexpectedErrno(err), + } + }; + + const new_flags: u32 = f: { + var new_flags = flags; + new_flags.CLOEXEC = false; + break :f @bitCast(new_flags); + }; + + // Set every other flag affecting the file status using F.SETFL. + if (new_flags != 0) for (fds) |fd| { + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, new_flags))) { + .SUCCESS => {}, + .INVAL => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + }; + + return fds; +} diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index 774949d676..34aa4a8c68 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -187,7 +187,7 @@ test "cancel blocked read from pipe" { .bInheritHandle = std.os.windows.FALSE, }), else => { - const pipe = try std.posix.pipe(); + const pipe = try std.Io.Threaded.pipe2(.{}); read_end = .{ .handle = pipe[0] }; write_end = .{ .handle = pipe[1] }; }, diff --git a/lib/std/os/linux/IoUring/test.zig b/lib/std/os/linux/IoUring/test.zig index 251ba14a8b..899a6dae64 100644 --- a/lib/std/os/linux/IoUring/test.zig +++ b/lib/std/os/linux/IoUring/test.zig @@ -280,7 +280,7 @@ test "splice/read" { var buffer_read = [_]u8{98} ** 20; try file_src.writeStreamingAll(io, &buffer_write); - const fds = try posix.pipe(); + const fds = try std.Io.Threaded.pipe2(.{}); const pipe_offset: u64 = std.math.maxInt(u64); const sqe_splice_to_pipe = try ring.splice(0x11111111, fd_src, 0, fds[1], pipe_offset, buffer_write.len); diff --git a/lib/std/posix.zig b/lib/std/posix.zig index 33cccb64f4..fed55f34cf 100644 --- a/lib/std/posix.zig +++ b/lib/std/posix.zig @@ -2105,80 +2105,6 @@ pub fn msync(memory: []align(page_size_min) u8, flags: i32) MSyncError!void { } } -pub const PipeError = error{ - SystemFdQuotaExceeded, - ProcessFdQuotaExceeded, -} || UnexpectedError; - -/// Creates a unidirectional data channel that can be used for interprocess communication. -pub fn pipe() PipeError![2]fd_t { - var fds: [2]fd_t = undefined; - switch (errno(system.pipe(&fds))) { - .SUCCESS => return fds, - .INVAL => unreachable, // Invalid parameters to pipe() - .FAULT => unreachable, // Invalid fds pointer - .NFILE => return error.SystemFdQuotaExceeded, - .MFILE => return error.ProcessFdQuotaExceeded, - else => |err| return unexpectedErrno(err), - } -} - -pub fn pipe2(flags: O) PipeError![2]fd_t { - if (@TypeOf(system.pipe2) != void) { - var fds: [2]fd_t = undefined; - switch (errno(system.pipe2(&fds, flags))) { - .SUCCESS => return fds, - .INVAL => unreachable, // Invalid flags - .FAULT => unreachable, // Invalid fds pointer - .NFILE => return error.SystemFdQuotaExceeded, - .MFILE => return error.ProcessFdQuotaExceeded, - else => |err| return unexpectedErrno(err), - } - } - - const fds: [2]fd_t = try pipe(); - errdefer { - close(fds[0]); - close(fds[1]); - } - - // https://github.com/ziglang/zig/issues/18882 - if (@as(u32, @bitCast(flags)) == 0) - return fds; - - // CLOEXEC is special, it's a file descriptor flag and must be set using - // F.SETFD. - if (flags.CLOEXEC) { - for (fds) |fd| { - switch (errno(system.fcntl(fd, F.SETFD, @as(u32, FD_CLOEXEC)))) { - .SUCCESS => {}, - .INVAL => unreachable, // Invalid flags - .BADF => unreachable, // Always a race condition - else => |err| return unexpectedErrno(err), - } - } - } - - const new_flags: u32 = f: { - var new_flags = flags; - new_flags.CLOEXEC = false; - break :f @bitCast(new_flags); - }; - // Set every other flag affecting the file status using F.SETFL. - if (new_flags != 0) { - for (fds) |fd| { - switch (errno(system.fcntl(fd, F.SETFL, new_flags))) { - .SUCCESS => {}, - .INVAL => unreachable, // Invalid flags - .BADF => unreachable, // Always a race condition - else => |err| return unexpectedErrno(err), - } - } - } - - return fds; -} - pub const SysCtlError = error{ PermissionDenied, SystemResources, diff --git a/lib/std/posix/test.zig b/lib/std/posix/test.zig index d79dc547c3..f385a82663 100644 --- a/lib/std/posix/test.zig +++ b/lib/std/posix/test.zig @@ -131,7 +131,7 @@ test "pipe" { if (native_os == .windows or native_os == .wasi) return error.SkipZigTest; - const fds = try posix.pipe(); + const fds = try std.Io.Threaded.pipe2(.{}); try expect((try posix.write(fds[1], "hello")) == 5); var buf: [16]u8 = undefined; try expect((try posix.read(fds[0], buf[0..])) == 5);