diff --git a/lib/std/process.zig b/lib/std/process.zig index 4a021879a5..b203838e3f 100644 --- a/lib/std/process.zig +++ b/lib/std/process.zig @@ -455,6 +455,7 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child { pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{ StreamTooLong, + ConcurrencyUnavailable, }; pub const RunOptions = struct { diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index fc31014520..e64f6106fa 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -125,14 +125,15 @@ pub fn wait(child: *Child, io: Io) WaitError!Term { return io.vtable.childWait(io.userdata, child); } -pub const CollectOutputError = error{StreamTooLong} || Allocator.Error || Io.File.Reader.Error; +pub const CollectOutputError = error{ + StreamTooLong, + ConcurrencyUnavailable, +} || Allocator.Error || Io.File.Reader.Error; pub const CollectOutputOptions = struct { stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), - /// Used for `stdout` and `stderr`. If not provided, only the existing - /// capacity will be used. - allocator: ?Allocator = null, + allocator: Allocator, stdout_limit: Io.Limit = .unlimited, stderr_limit: Io.Limit = .unlimited, }; @@ -144,56 +145,24 @@ pub const CollectOutputOptions = struct { /// The process must have been started with stdout and stderr set to /// `process.SpawnOptions.StdIo.pipe`. pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) CollectOutputError!void { - const files: [2]Io.File = .{ child.stdout.?, child.stderr.? }; - const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr }; - const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit }; - var dones: [2]bool = .{ false, false }; - var reads: [2]Io.Operation = undefined; - var vecs: [2][1][]u8 = undefined; - while (true) { - for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| { - if (done) { - read.* = .noop; - continue; - } - if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1); - const cap = list.unusedCapacitySlice(); - if (cap.len == 0) return error.StreamTooLong; - vec[0] = cap; - read.* = .{ .file_read_streaming = .{ - .file = file, - .data = vec, - .nonblocking = true, - .result = undefined, - } }; - } - var all_done = true; - var any_canceled = false; - var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {}; - io.vtable.operate(io.userdata, &reads); - for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| { - if (done.*) continue; - const n = read.file_read_streaming.result catch |err| switch (err) { - error.Canceled => { - any_canceled = true; - continue; - }, - error.WouldBlock => continue, - else => |e| { - other_err = e; - continue; - }, - }; - if (n == 0) { - done.* = true; - } else { - all_done = false; - } - list.items.len += n; - if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong; - } - if (any_canceled) return error.Canceled; - try other_err; - if (all_done) return; - } + var stdout = try io.concurrent(collectStream, .{ + io, options.allocator, child.stdout.?, options.stdout, options.stdout_limit, + }); + defer stdout.cancel(io) catch {}; + + var stderr = try io.concurrent(collectStream, .{ + io, options.allocator, child.stderr.?, options.stderr, options.stderr_limit, + }); + defer stderr.cancel(io) catch {}; + + try stdout.await(io); + try stderr.await(io); +} + +fn collectStream(io: Io, gpa: Allocator, file: File, list: *std.ArrayList(u8), limit: Io.Limit) CollectOutputError!void { + var fr = file.readerStreaming(io, &.{}); + fr.interface.appendRemaining(gpa, list, limit) catch |err| switch (err) { + error.ReadFailed => return fr.err.?, + else => |e| return e, + }; }