diff --git a/lib/std/Io.zig b/lib/std/Io.zig index cd846e5dae..a5584da61e 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1184,8 +1184,6 @@ pub fn Select(comptime U: type) type { return struct { io: Io, group: Group, - /// The queue is never closed because there may be live resources - /// inserted into it which would otherwise leak. queue: Queue(U), const S = @This(); @@ -1235,7 +1233,7 @@ pub fn Select(comptime U: type) type { const raw_result = @call(.auto, function, context.args); const elem = @unionInit(U, @tagName(field), raw_result); context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) { - error.Closed => unreachable, + error.Closed => {}, }; if (@typeInfo(@TypeOf(raw_result)) == .error_union) _ = raw_result catch |err| if (err == error.Canceled) return error.Canceled; @@ -1274,7 +1272,7 @@ pub fn Select(comptime U: type) type { const raw_result = @call(.auto, function, context.args); const elem = @unionInit(U, @tagName(field), raw_result); context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) { - error.Closed => unreachable, + error.Closed => {}, }; if (@typeInfo(@TypeOf(raw_result)) == .error_union) _ = raw_result catch |err| if (err == error.Canceled) return error.Canceled; @@ -1286,6 +1284,8 @@ pub fn Select(comptime U: type) type { /// Blocks until another task of the select finishes. /// + /// It is legal to call `async` and `concurrent` after this. + /// /// Threadsafe. pub fn await(s: *S) Cancelable!U { return s.queue.getOne(s.io) catch |err| switch (err) { @@ -1299,6 +1299,8 @@ pub fn Select(comptime U: type) type { /// /// Asserts that `buffer.len >= min`. /// + /// It is legal to call `async` and `concurrent` after this. + /// /// Threadsafe. pub fn awaitMany(s: *S, buffer: []U, min: usize) Cancelable!usize { return s.queue.get(s.io, buffer, min) catch |err| switch (err) { @@ -1307,16 +1309,53 @@ pub fn Select(comptime U: type) type { }; } - /// Equivalent to `await` but requests cancelation on all remaining - /// tasks owned by the select. + /// Requests cancelation on all remaining tasks owned by the select, + /// then blocks until they all finish. If the select was initialized + /// with insufficient buffer space for all remaining tasks to finish, a + /// deadlock occurs. /// - /// For a description of cancelation and cancelation points, see `Future.cancel`. + /// If any of the select tasks allocate resources, those tasks may have + /// completed, meaning that this function must be called in a loop + /// until `null` is returned in order to deallocate those resources. If + /// there is no possibility of resource leaks, `cancelDiscard` is + /// preferable. /// - /// It is illegal to call `await` after this. + /// It is illegal to call `await` or `awaitMany` after this. /// - /// Idempotent. Threadsafe. - pub fn cancel(s: *S) void { - s.group.cancel(s.io); + /// It is safe to call this multiple times, even after `null` is + /// returned. + /// + /// Threadsafe. + pub fn cancel(s: *S) ?U { + const io = s.io; + if (s.group.token.load(.acquire)) |token| { + io.vtable.groupCancel(io.userdata, &s.group, token); + assert(s.group.token.raw == null); + s.queue.close(io); + } + return s.queue.getOneUncancelable(io) catch |err| switch (err) { + error.Closed => return null, + }; + } + + /// Requests cancelation on all remaining tasks owned by the select, + /// then blocks until they all finish. + /// + /// All return values from outstanding tasks are discarded. This + /// function is therefore inappropriate to call when a task can return + /// an allocated resource. For that use case, see `cancel`. + /// + /// It is illegal to call `await` or `awaitMany` after this. + /// + /// It is safe to call this multiple times. + /// + /// Threadsafe. + pub fn cancelDiscard(s: *S) void { + const io = s.io; + const token = s.group.token.load(.acquire) orelse return; + s.queue.close(io); + io.vtable.groupCancel(io.userdata, &s.group, token); + assert(s.group.token.raw == null); } }; } @@ -1693,6 +1732,12 @@ pub const TypeErasedQueue = struct { }; } + /// After this is called, the queue enters a "closed" state. A closed + /// queue always returns `error.Closed` for put attempts even when + /// there is space in the buffer. However, existing elements of the + /// queue are retrieved before `error.Closed` is returned. + /// + /// Threadsafe. pub fn close(q: *TypeErasedQueue, io: Io) void { q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); @@ -1967,6 +2012,12 @@ pub fn Queue(Elem: type) type { return .{ .type_erased = .init(@ptrCast(buffer)) }; } + /// After this is called, the queue enters a "closed" state. A closed + /// queue always returns `error.Closed` for put attempts even when + /// there is space in the buffer. However, existing elements of the + /// queue are retrieved before `error.Closed` is returned. + /// + /// Threadsafe. pub fn close(q: *@This(), io: Io) void { q.type_erased.close(io); } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 5da87cca28..587c23eb85 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -835,7 +835,7 @@ test "Select" { }; var buffer: [4]U = undefined; var select: Io.Select(U) = .init(io, &buffer); - defer select.cancel(); + defer _ = select.cancel(); select.async(.foo, S.foo, .{}); select.concurrent(.bar, S.bar, .{io}) catch |err| switch (err) { @@ -864,3 +864,27 @@ test "Select" { try testing.expectEqual(42, result); } + +test "Select with empty buffer, no deadlock" { + const S = struct { + fn sleeper(io: Io, duration: Io.Duration) Io.Cancelable!void { + try io.sleep(duration, .awake); + } + }; + + const io = testing.io; + + const U = union(enum) { + sleeper: Io.Cancelable!void, + }; + var select: Io.Select(U) = .init(io, &.{}); + defer select.cancelDiscard(); + + select.concurrent(.sleeper, S.sleeper, .{ io, .fromNanoseconds(1) }) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + select.concurrent(.sleeper, S.sleeper, .{ io, .fromSeconds(600) }) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + assert((try select.await()) == .sleeper); +}