From 2674acdb77f8a144eaedbfa57a1b7fdc70dc1e60 Mon Sep 17 00:00:00 2001 From: Jacob Young Date: Fri, 30 Jan 2026 01:44:07 -0500 Subject: [PATCH] Io.Batch: implement alternate API --- lib/std/Io.zig | 245 +++++++------ lib/std/Io/File.zig | 7 +- lib/std/Io/File/MultiReader.zig | 52 ++- lib/std/Io/Threaded.zig | 589 +++++++++++++++++++------------- 4 files changed, 517 insertions(+), 376 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index c00e4619e8..ca77a9836a 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -149,8 +149,9 @@ pub const VTable = struct { futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, - operate: *const fn (?*anyopaque, *Operation) Cancelable!void, - batchWait: *const fn (?*anyopaque, *Batch, Timeout) Batch.WaitError!void, + operate: *const fn (?*anyopaque, Operation) Cancelable!Operation.Result, + batchAwaitAsync: *const fn (?*anyopaque, *Batch) Batch.AwaitAsyncError!void, + batchAwaitConcurrent: *const fn (?*anyopaque, *Batch, Timeout) Batch.AwaitConcurrentError!void, batchCancel: *const fn (?*anyopaque, *Batch) void, dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void, @@ -255,19 +256,14 @@ pub const VTable = struct { }; pub const Operation = union(enum) { - noop: Noop, file_read_streaming: FileReadStreaming, - pub const Noop = struct { - reserved: [2]usize = .{ 0, 0 }, - status: Status(void) = .{ .unstarted = {} }, - }; + pub const Tag = @typeInfo(Operation).@"union".tag_type.?; /// May return 0 reads which is different than `error.EndOfStream`. pub const FileReadStreaming = struct { file: File, data: []const []u8, - status: Status(Error!usize) = .{ .unstarted = {} }, pub const Error = UnendingError || error{EndOfStream}; pub const UnendingError = error{ @@ -290,19 +286,72 @@ pub const Operation = union(enum) { /// lock. LockViolation, } || Io.UnexpectedError; + + pub const Result = usize; }; - pub fn Status(Result: type) type { - return union { - unstarted: void, - pending: *Batch, + pub const Result = Result: { + const operation_fields = @typeInfo(Operation).@"union".fields; + var field_names: [operation_fields.len][]const u8 = undefined; + var field_types: [operation_fields.len]type = undefined; + for (operation_fields, &field_names, &field_types) |field, *field_name, *field_type| { + field_name.* = field.name; + field_type.* = field.type.Error!field.type.Result; + } + break :Result @Union(.auto, Tag, &field_names, &field_types, &@splat(.{})); + }; + + pub const Storage = union { + unused: List.DoubleNode, + submission: Submission, + pending: Pending, + completion: Completion, + + pub const Submission = struct { + node: List.SingleNode, + operation: Operation, + }; + + pub const Pending = struct { + node: List.DoubleNode, + tag: Tag, + context: [3]usize, + }; + + pub const Completion = struct { + node: List.SingleNode, result: Result, }; - } + }; + + pub const OptionalIndex = enum(u32) { + none = std.math.maxInt(u32), + _, + + pub fn fromIndex(i: usize) OptionalIndex { + const oi: OptionalIndex = @enumFromInt(i); + assert(oi != .none); + return oi; + } + + pub fn toIndex(oi: OptionalIndex) u32 { + assert(oi != .none); + return @intFromEnum(oi); + } + }; + pub const List = struct { + head: OptionalIndex, + tail: OptionalIndex, + + pub const empty: List = .{ .head = .none, .tail = .none }; + + pub const SingleNode = struct { next: OptionalIndex }; + pub const DoubleNode = struct { prev: OptionalIndex, next: OptionalIndex }; + }; }; /// Performs one `Operation`. -pub fn operate(io: Io, operation: *Operation) Cancelable!void { +pub fn operate(io: Io, operation: Operation) Cancelable!Operation.Result { return io.vtable.operate(io.userdata, operation); } @@ -312,116 +361,96 @@ pub fn operate(io: Io, operation: *Operation) Cancelable!void { /// This is a low-level abstraction based on `Operation`. For a higher /// level API that operates on `Future`, see `Select`. pub const Batch = struct { - operations: []Operation, - ring: [*]u32, - user: struct { - submit_tail: RingIndex, - complete_head: RingIndex, - complete_tail: RingIndex, - }, - impl: struct { - submit_head: RingIndex, - submit_tail: RingIndex, - complete_tail: RingIndex, - reserved: ?*anyopaque, - }, - - pub const RingIndex = enum(u32) { - _, - - pub fn index(ri: RingIndex, len: u31) u31 { - const i = @intFromEnum(ri); - assert(i < @as(u32, len) * 2); - return @intCast(if (i < len) i else i - len); - } - - pub fn prev(ri: RingIndex, len: u31) RingIndex { - const i = @intFromEnum(ri); - const double_len = @as(u32, len) * 2; - assert(i <= double_len); - return @enumFromInt((if (i > 0) i else double_len) - 1); - } - - pub fn next(ri: RingIndex, len: u31) RingIndex { - const i = @intFromEnum(ri) + 1; - const double_len = @as(u32, len) * 2; - assert(i <= double_len); - return @enumFromInt(if (i < double_len) i else 0); - } - }; + storage: []Operation.Storage, + unused: Operation.List, + submissions: Operation.List, + pending: Operation.List, + completions: Operation.List, + context: ?*anyopaque, /// After calling this, it is safe to unconditionally defer a call to /// `cancel`. - pub fn init(operations: []Operation, ring: []u32) Batch { - const len: u31 = @intCast(operations.len); - assert(ring.len == len); + pub fn init(storage: []Operation.Storage) Batch { + var prev: Operation.OptionalIndex = .none; + for (storage, 0..) |*operation, index| { + operation.* = .{ .unused = .{ .prev = prev, .next = .fromIndex(index + 1) } }; + prev = .fromIndex(index); + } + storage[storage.len - 1].unused.next = .none; return .{ - .operations = operations, - .ring = ring.ptr, - .user = .{ - .submit_tail = @enumFromInt(0), - .complete_head = @enumFromInt(0), - .complete_tail = @enumFromInt(0), - }, - .impl = .{ - .submit_head = @enumFromInt(0), - .submit_tail = @enumFromInt(0), - .complete_tail = @enumFromInt(0), - .reserved = null, + .storage = storage, + .unused = .{ + .head = .fromIndex(0), + .tail = .fromIndex(storage.len - 1), }, + .submissions = .empty, + .pending = .empty, + .completions = .empty, + .context = null, }; } - /// Adds `b.operations[operation]` to the list of submitted operations - /// that will be performed when `wait` is called. - pub fn add(b: *Batch, operation: usize) void { - const tail = b.user.submit_tail; - const len: u31 = @intCast(b.operations.len); - b.user.submit_tail = tail.next(len); - b.ring[0..len][tail.index(len)] = @intCast(operation); + /// Adds an operation to be performed at the next await call. + /// Returns the index that will be returned by `next` after the operation completes. + /// Asserts that no more than `storage.len` operations are active at a time. + pub fn add(b: *Batch, operation: Operation) u32 { + const index = b.unused.next; + b.addAt(index.toIndex(), operation); + return index; } - fn flush(b: *Batch) void { - @atomicStore(RingIndex, &b.impl.submit_tail, b.user.submit_tail, .release); - } - - /// Returns `operation` such that `b.operations[operation]` has completed. - /// Returns `null` when `wait` should be called. - pub fn next(b: *Batch) ?u32 { - const head = b.user.complete_head; - if (head == b.user.complete_tail) { - @branchHint(.unlikely); - b.flush(); - const tail = @atomicLoad(RingIndex, &b.impl.complete_tail, .acquire); - if (head == tail) { - @branchHint(.unlikely); - return null; - } - assert(head != tail); - b.user.complete_tail = tail; + /// Adds an operation to be performed at the next await call. + /// After the operation completes, `next` will return `index`. + /// Asserts that the operation at `index` is not active. + pub fn addAt(b: *Batch, index: u32, operation: Operation) void { + const storage = &b.storage[index]; + const unused = storage.unused; + switch (unused.prev) { + .none => b.unused.head = .none, + else => |prev_index| b.storage[prev_index.toIndex()].unused.next = unused.next, } - const len: u31 = @intCast(b.operations.len); - b.user.complete_head = head.next(len); - return b.ring[0..len][head.index(len)]; + switch (unused.next) { + .none => b.unused.tail = .none, + else => |next_index| b.storage[next_index.toIndex()].unused.prev = unused.prev, + } + + switch (b.submissions.tail) { + .none => b.submissions.head = .fromIndex(index), + else => |tail_index| b.storage[tail_index.toIndex()].submission.node.next = .fromIndex(index), + } + storage.* = .{ .submission = .{ .node = .{ .next = .none }, .operation = operation } }; + b.submissions.tail = .fromIndex(index); } - pub const WaitError = ConcurrentError || Cancelable || Timeout.Error; + pub fn next(b: *Batch) ?struct { index: u32, result: Operation.Result } { + const index = b.completions.head; + if (index == .none) return null; + const storage = &b.storage[index.toIndex()]; + const completion = storage.completion; + const next_index = completion.node.next; + b.completions.head = next_index; + if (next_index == .none) b.completions.tail = .none; - /// Starts work on any submitted operations and returns when at least one has completeed. - /// - /// Returns `error.Timeout` if `timeout` expires first. - /// - /// Depending on the `Io` implementation, may allocate resources that are - /// freed with `cancel`, even if an error is returned. - pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void { - return io.vtable.batchWait(io.userdata, b, timeout); + const tail_index = b.unused.tail; + switch (tail_index) { + .none => b.unused.head = index, + else => b.storage[tail_index.toIndex()].unused.next = index, + } + storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } }; + b.unused.tail = index; + return .{ .index = index.toIndex(), .result = completion.result }; + } + + pub const AwaitAsyncError = Cancelable; + pub fn awaitAsync(b: *Batch, io: Io) AwaitAsyncError!void { + return io.vtable.batchAwaitAsync(io.userdata, b); + } + + pub const AwaitConcurrentError = ConcurrentError || Cancelable || Timeout.Error; + pub fn awaitConcurrent(b: *Batch, io: Io, timeout: Timeout) AwaitConcurrentError!void { + return io.vtable.batchAwaitConcurrent(io.userdata, b, timeout); } - /// Returns after all `operations` have completed. Operations which have not completed - /// after this function returns were successfully dropped and had no side effects. - /// - /// This function is idempotent with respect to itself and `wait`. It is - /// safe to unconditionally `defer` a call to this function after `init`. pub fn cancel(b: *Batch, io: Io) void { return io.vtable.batchCancel(io.userdata, b); } diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index df5b3b5a53..5db6f81ac6 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -559,12 +559,11 @@ pub const ReadStreamingError = error{EndOfStream} || Reader.Error; /// See also: /// * `reader` pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize { - var operation: Io.Operation = .{ .file_read_streaming = .{ + const result = try io.operate(.{ .file_read_streaming = .{ .file = file, .data = buffer, - } }; - try io.operate(&operation); - return operation.file_read_streaming.status.result; + } }); + return result.file_read_streaming; } pub const ReadPositionalError = error{ diff --git a/lib/std/Io/File/MultiReader.zig b/lib/std/Io/File/MultiReader.zig index 7a0f8de068..217215a363 100644 --- a/lib/std/Io/File/MultiReader.zig +++ b/lib/std/Io/File/MultiReader.zig @@ -22,8 +22,7 @@ pub const UnendingError = Allocator.Error || File.Reader.Error || Io.ConcurrentE /// Trailing: /// * `contexts: [len]Context` -/// * `ring: [len]u32` -/// * `operations: [len]Io.Operation` +/// * `storage: [len]Io.Operation.Storage` pub const Streams = extern struct { len: u32, @@ -33,17 +32,10 @@ pub const Streams = extern struct { return ptr[0..s.len]; } - pub fn ring(s: *Streams) []u32 { + pub fn storage(s: *Streams) []Io.Operation.Storage { const prev = contexts(s); const end = prev.ptr + prev.len; - const ptr: [*]u32 = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(u32))); - return ptr[0..s.len]; - } - - pub fn operations(s: *Streams) []Io.Operation { - const prev = ring(s); - const end = prev.ptr + prev.len; - const ptr: [*]Io.Operation = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation))); + const ptr: [*]Io.Operation.Storage = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation.Storage))); return ptr[0..s.len]; } }; @@ -52,8 +44,7 @@ pub fn Buffer(comptime n: usize) type { return extern struct { len: u32, contexts: [n][@sizeOf(Context)]u8 align(@alignOf(Context)), - ring: [n]u32, - operations: [n][@sizeOf(Io.Operation)]u8 align(@alignOf(Io.Operation)), + storage: [n][@sizeOf(Io.Operation.Storage)]u8 align(@alignOf(Io.Operation.Storage)), pub fn toStreams(b: *@This()) *Streams { b.len = n; @@ -86,25 +77,22 @@ pub fn init(mr: *MultiReader, gpa: Allocator, io: Io, streams: *Streams, files: .vec = .{&.{}}, .err = null, }; - const operations = streams.operations(); - const ring = streams.ring(); mr.* = .{ .gpa = gpa, .streams = streams, - .batch = .init(operations, ring), + .batch = .init(streams.storage()), }; - for (operations, contexts, files, 0..) |*op, *context, file, i| { + for (contexts, 0..) |*context, i| { const r = &context.fr.interface; - op.* = .{ .file_read_streaming = .{ - .file = file, - .data = &context.vec, - } }; rebaseGrowing(mr, context, 1) catch |err| { context.err = err; continue; }; context.vec[0] = r.buffer; - mr.batch.add(i); + mr.batch.addAt(@intCast(i), .{ .file_read_streaming = .{ + .file = context.fr.file, + .data = &context.vec, + } }); } } @@ -204,7 +192,7 @@ fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void { }; } -pub const FillError = Io.Batch.WaitError || error{ +pub const FillError = Io.Batch.AwaitConcurrentError || error{ /// `fill` was called when all streams already have failed or reached the /// end. EndOfStream, @@ -213,17 +201,15 @@ pub const FillError = Io.Batch.WaitError || error{ /// Wait until at least one stream receives more data. pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillError!void { const contexts = mr.streams.contexts(); - const operations = mr.streams.operations(); const io = contexts[0].fr.io; var any_completed = false; - try mr.batch.wait(io, timeout); + try mr.batch.awaitConcurrent(io, timeout); - while (mr.batch.next()) |i| { + while (mr.batch.next()) |operation| { any_completed = true; - const context = &contexts[i]; - const operation = &operations[i]; - const n = operation.file_read_streaming.status.result catch |err| { + const context = &contexts[operation.index]; + const n = operation.result.file_read_streaming catch |err| { context.err = err; continue; }; @@ -237,15 +223,17 @@ pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillE assert(r.seek == 0); } context.vec[0] = r.buffer[r.end..]; - operation.file_read_streaming.status = .{ .unstarted = {} }; - mr.batch.add(i); + mr.batch.addAt(operation.index, .{ .file_read_streaming = .{ + .file = context.fr.file, + .data = &context.vec, + } }); } if (!any_completed) return error.EndOfStream; } /// Wait until all streams fail or reach the end. -pub fn fillRemaining(mr: *MultiReader, timeout: Io.Timeout) Io.Batch.WaitError!void { +pub fn fillRemaining(mr: *MultiReader, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void { while (fill(mr, 1, timeout)) |_| {} else |err| switch (err) { error.EndOfStream => return, else => |e| return e, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index bccaf24ecd..18f24eb40c 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1617,7 +1617,8 @@ pub fn io(t: *Threaded) Io { .futexWake = futexWake, .operate = operate, - .batchWait = batchWait, + .batchAwaitAsync = batchAwaitAsync, + .batchAwaitConcurrent = batchAwaitConcurrent, .batchCancel = batchCancel, .dirCreateDir = dirCreateDir, @@ -1780,7 +1781,8 @@ pub fn ioBasic(t: *Threaded) Io { .futexWake = futexWake, .operate = operate, - .batchWait = batchWait, + .batchAwaitAsync = batchAwaitAsync, + .batchAwaitConcurrent = batchAwaitConcurrent, .batchCancel = batchCancel, .dirCreateDir = dirCreateDir, @@ -2483,85 +2485,227 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { Thread.futexWake(ptr, max_waiters); } -fn operate(userdata: ?*anyopaque, op: *Io.Operation) Io.Cancelable!void { +fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Operation.Result { const t: *Threaded = @ptrCast(@alignCast(userdata)); - switch (op.*) { - .noop => |*o| { - _ = o.status.unstarted; - o.status = .{ .result = {} }; - }, - .file_read_streaming => |*o| { - _ = o.status.unstarted; - o.status = .{ .result = fileReadStreaming(t, o.file, o.data) catch |err| switch (err) { + switch (operation) { + .file_read_streaming => |o| return .{ + .file_read_streaming = fileReadStreaming(t, o.file, o.data) catch |err| switch (err) { error.Canceled => |e| return e, else => |e| e, - } }; + }, }, } } -fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void { +fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Batch.AwaitAsyncError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (is_windows) return batchWaitWindows(t, b, timeout); + if (is_windows) { + try batchAwaitWindows(b); + const alertable_syscall = try AlertableSyscall.start(); + while (b.pending.head != .none and b.completions.head == .none) waitForApcOrAlert(); + alertable_syscall.finish(); + return; + } if (native_os == .wasi and !builtin.link_libc) @panic("TODO"); - const operations = b.operations; - const len: u31 = @intCast(operations.len); - const ring = b.ring[0..len]; - var submit_head = b.impl.submit_head; - const submit_tail = b.user.submit_tail; - b.impl.submit_tail = submit_tail; - var complete_tail = b.impl.complete_tail; - var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index - var poll_i: u8 = 0; - defer { - for (map_buffer[0..poll_i]) |op| { - submit_head = submit_head.prev(len); - ring[submit_head.index(len)] = op; - } - b.impl.submit_head = submit_head; - b.impl.complete_tail = complete_tail; - b.user.complete_tail = complete_tail; - } var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; - while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { - const op = ring[submit_head.index(len)]; - const operation = &operations[op]; - switch (operation.*) { - .noop => |*o| { - _ = o.status.unstarted; - o.status = .{ .result = {} }; - submitComplete(ring, &complete_tail, op); - }, - .file_read_streaming => |*o| { - _ = o.status.unstarted; - if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable; - poll_buffer[poll_i] = .{ - .fd = o.file.handle, - .events = posix.POLL.IN, - .revents = 0, - }; - map_buffer[poll_i] = @intCast(op); - poll_i += 1; - }, + var poll_len: u32 = 0; + { + var index = b.submissions.head; + while (index != .none and poll_len < poll_buffer_len) { + const submission = &b.storage[index.toIndex()].submission; + switch (submission.operation) { + .file_read_streaming => |o| { + poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 }; + poll_len += 1; + }, + } + index = submission.node.next; } } - switch (poll_i) { + switch (poll_len) { + 0 => return, + 1 => {}, + else => while (true) { + const timeout_ms: i32 = t: { + if (b.completions.head != .none) { + // It is legal to call batchWait with already completed + // operations in the ring. In such case, we need to avoid + // blocking in the poll syscall, but we can still take this + // opportunity to find additional ready operations. + break :t 0; + } + const max_poll_ms = std.math.maxInt(i32); + break :t max_poll_ms; + }; + const syscall = try Syscall.start(); + const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms); + syscall.finish(); + switch (posix.errno(rc)) { + .SUCCESS => { + if (rc == 0) { + if (b.completions.head != .none) { + // Since there are already completions available in the + // queue, this is neither a timeout nor a case for + // retrying. + return; + } + continue; + } + var prev_index: Io.Operation.OptionalIndex = .none; + var index = b.submissions.head; + for (poll_buffer[0..poll_len]) |poll_entry| { + const storage = &b.storage[index.toIndex()]; + const submission = &storage.submission; + const next_index = submission.node.next; + if (poll_entry.revents != 0) { + const result = try operate(t, submission.operation); + + switch (prev_index) { + .none => b.submissions.head = next_index, + else => b.storage[prev_index.toIndex()].submission.node.next = next_index, + } + if (next_index == .none) b.submissions.tail = prev_index; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + b.completions.tail = index; + } else prev_index = index; + index = next_index; + } + assert(index == .none); + return; + }, + .INTR => continue, + else => break, + } + }, + } + { + var tail_index = b.completions.tail; + defer b.completions.tail = tail_index; + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const storage = &b.storage[index.toIndex()]; + const submission = &storage.submission; + const next_index = submission.node.next; + const result = try operate(t, submission.operation); + + switch (tail_index) { + .none => b.completions.head = index, + else => b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + tail_index = index; + index = next_index; + } + b.submissions = .{ .head = .none, .tail = .none }; + } +} + +fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (is_windows) { + const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) { + error.Unexpected => deadline: { + recoverableOsBugDetected(); + break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake }; + }, + error.UnsupportedClock => |e| return e, + }; + try batchAwaitWindows(b); + while (b.pending.head != .none and b.completions.head == .none) { + var delay_interval: windows.LARGE_INTEGER = interval: { + const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER); + break :interval t.deadlineToWindowsInterval(d) catch |err| switch (err) { + error.UnsupportedClock => |e| return e, + error.Unexpected => { + recoverableOsBugDetected(); + break :interval -1; + }, + }; + }; + const alertable_syscall = try AlertableSyscall.start(); + const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval); + alertable_syscall.finish(); + switch (delay_rc) { + .SUCCESS, .TIMEOUT => { + // The thread woke due to the timeout. Although spurious + // timeouts are OK, when no deadline is passed we must not + // return `error.Timeout`. + if (timeout != .none and b.completions.head == .none) return error.Timeout; + }, + else => {}, + } + } + return; + } + if (native_os == .wasi and !builtin.link_libc) @panic("TODO"); + var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; + var poll_storage: struct { + gpa: std.mem.Allocator, + b: *Io.Batch, + slice: []posix.pollfd, + len: u32, + + fn add(storage: *@This(), file: Io.File, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void { + const len = storage.len; + if (len == poll_buffer_len) { + const slice: []posix.pollfd = if (storage.b.context) |context| + @as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..storage.b.storage.len] + else allocation: { + const allocation = storage.gpa.alloc(posix.pollfd, storage.b.storage.len) catch + return error.ConcurrencyUnavailable; + storage.b.context = allocation.ptr; + break :allocation allocation; + }; + @memcpy(slice[0..poll_buffer_len], storage.slice); + } + storage.slice[len] = .{ + .fd = file.handle, + .events = events, + .revents = 0, + }; + storage.len = len + 1; + } + } = .{ .gpa = t.allocator, .b = b, .slice = &poll_buffer, .len = 0 }; + { + var index = b.submissions.head; + while (index != .none) { + const submission = &b.storage[index.toIndex()].submission; + switch (submission.operation) { + .file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN), + } + index = submission.node.next; + } + } + switch (poll_storage.len) { 0 => return, 1 => if (timeout == .none) { - const op = map_buffer[0]; - try operate(t, &operations[op]); - submitComplete(ring, &complete_tail, op); - poll_i = 0; + const index = b.submissions.head; + const storage = &b.storage[index.toIndex()]; + const result = try operate(t, storage.submission.operation); + + b.submissions = .{ .head = .none, .tail = .none }; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + b.completions.tail = index; return; }, else => {}, } const t_io = ioBasic(t); const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock; - const max_poll_ms = std.math.maxInt(i32); while (true) { const timeout_ms: i32 = t: { - if (b.user.complete_head != complete_tail) { + if (b.completions.head != .none) { // It is legal to call batchWait with already completed // operations in the ring. In such case, we need to avoid // blocking in the poll syscall, but we can still take this @@ -2571,15 +2715,16 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch. const d = deadline orelse break :t -1; const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock; if (duration.raw.nanoseconds <= 0) return error.Timeout; + const max_poll_ms = std.math.maxInt(i32); break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); }; const syscall = try Syscall.start(); - const rc = posix.system.poll(&poll_buffer, poll_i, timeout_ms); + const rc = posix.system.poll(&poll_buffer, poll_storage.len, timeout_ms); syscall.finish(); switch (posix.errno(rc)) { .SUCCESS => { if (rc == 0) { - if (b.user.complete_head != complete_tail) { + if (b.completions.head != .none) { // Since there are already completions available in the // queue, this is neither a timeout nor a case for // retrying. @@ -2590,18 +2735,30 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch. if (deadline == null) continue; return error.Timeout; } - while (poll_i != 0) { - poll_i -= 1; - const poll_fd = &poll_buffer[poll_i]; - const op = map_buffer[poll_i]; - if (poll_fd.revents == 0) { - submit_head = submit_head.prev(len); - ring[submit_head.index(len)] = op; - } else { - try operate(t, &operations[op]); - submitComplete(ring, &complete_tail, op); - } + var prev_index: Io.Operation.OptionalIndex = .none; + var index = b.submissions.head; + for (poll_storage.slice[0..poll_storage.len]) |poll_entry| { + const submission = &b.storage[index.toIndex()].submission; + const next_index = submission.node.next; + if (poll_entry.revents != 0) { + const result = try operate(t, submission.operation); + + switch (prev_index) { + .none => b.submissions.head = next_index, + else => b.storage[prev_index.toIndex()].submission.node.next = next_index, + } + if (next_index == .none) b.submissions.tail = prev_index; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + b.completions.tail = index; + b.storage[index.toIndex()] = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + } else prev_index = index; + index = next_index; } + assert(index == .none); return; }, .INTR => continue, @@ -2610,166 +2767,126 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch. } } -fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const operations = b.operations; - const len: u31 = @intCast(operations.len); - const ring = b.ring[0..len]; - var submit_head = b.impl.submit_head; - const submit_tail = b.user.submit_tail; - b.impl.submit_tail = submit_tail; - var complete_tail = b.impl.complete_tail; - while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { - const op = ring[submit_head.index(len)]; - switch (operations[op]) { - .noop => |*o| { - _ = o.status.unstarted; - o.status = .{ .result = {} }; - submitComplete(ring, &complete_tail, op); - }, - .file_read_streaming => |*o| _ = o.status.unstarted, - } - } - if (is_windows) { - // Iterate over pending and issue cancelations, then free the allocation for IO_STATUS_BLOCK - if (b.impl.reserved) |reserved| { - const gpa = t.allocator; - const metadatas_ptr: [*]WinOpMetadata = @ptrCast(@alignCast(reserved)); - const metadatas = metadatas_ptr[0..b.operations.len]; - for (metadatas, 0..) |*metadata, op| { - if (!metadata.pending) continue; - const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING; - if (done) continue; - switch (operations[op]) { - .noop => unreachable, - .file_read_streaming => |*o| { - _ = windows.ntdll.NtCancelIoFile(o.file.handle, &metadata.iosb); - }, - } - } - for (metadatas) |*metadata| { - if (!metadata.pending) continue; - while (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) { - waitForApcOrAlert(); - } - } - gpa.free(metadatas); - b.impl.reserved = null; - } - } - b.impl.submit_head = submit_tail; - b.impl.complete_tail = complete_tail; - b.user.complete_tail = complete_tail; -} - -const WinOpMetadata = struct { +const WindowsBatchPendingOperationContext = extern struct { + file: windows.HANDLE, iosb: windows.IO_STATUS_BLOCK, - pending: bool, + + const Erased = [3]usize; + + comptime { + assert(@sizeOf(Erased) <= @sizeOf(WindowsBatchPendingOperationContext)); + } + + fn toErased(context: *WindowsBatchPendingOperationContext) *Erased { + return @ptrCast(context); + } + + fn fromErased(erased: *Erased) *WindowsBatchPendingOperationContext { + return @ptrCast(erased); + } }; -fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void { - const operations = b.operations; - const len: u31 = @intCast(operations.len); - const ring = b.ring[0..len]; - var submit_head = b.impl.submit_head; - const submit_tail = b.user.submit_tail; - b.impl.submit_tail = submit_tail; - var complete_tail = b.impl.complete_tail; - - const metadatas_ptr: [*]WinOpMetadata = if (b.impl.reserved) |reserved| @ptrCast(@alignCast(reserved)) else a: { - const gpa = t.allocator; - const metadatas = gpa.alloc(WinOpMetadata, operations.len) catch return error.ConcurrencyUnavailable; - b.impl.reserved = metadatas.ptr; - @memset(metadatas, .{ .iosb = undefined, .pending = false }); - break :a metadatas.ptr; - }; - const metadatas = metadatas_ptr[0..operations.len]; - - defer { - b.impl.submit_head = submit_head; - b.impl.complete_tail = complete_tail; - b.user.complete_tail = complete_tail; - } - - while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { - const op = ring[submit_head.index(len)]; - const operation = &operations[op]; - const metadata = &metadatas[op]; - metadata.* = .{ .iosb = .{ - .u = .{ .Status = .PENDING }, - .Information = 0, - }, .pending = false }; - switch (operation.*) { - .noop => |*o| { - _ = o.status.unstarted; - o.status = .{ .result = {} }; - submitComplete(ring, &complete_tail, op); - }, - .file_read_streaming => |*o| { - _ = o.status.unstarted; - try ntReadFile(o.file.handle, o.data, &metadata.iosb); - if (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) { - o.status = .{ .pending = b }; - metadata.pending = true; - } else { - o.status = .{ .result = ntReadFileResult(&metadata.iosb) }; - submitComplete(ring, &complete_tail, op); - } - }, - } - } - - const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) { - error.Unexpected => deadline: { - recoverableOsBugDetected(); - break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake }; - }, - error.UnsupportedClock => |e| return e, - }; - - while (true) { - var any_pending = false; - for (metadatas, 0..) |*metadata, op_usize| { - if (!metadata.pending) continue; - any_pending = true; - const op: u31 = @intCast(op_usize); - const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING; - switch (operations[op]) { - .noop => unreachable, - .file_read_streaming => |*o| { - assert(o.status.pending == b); - if (!done) continue; - o.status = .{ .result = ntReadFileResult(&metadata.iosb) }; - }, +fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + { + var tail_index = b.unused.tail; + defer b.unused.tail = tail_index; + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const next_index = b.storage[index.toIndex()].submission.node.next; + switch (tail_index) { + .none => b.unused.head = index, + else => b.storage[tail_index.toIndex()].unused.next = index, } - metadata.pending = false; - submitComplete(ring, &complete_tail, op); - } - if (b.user.complete_head != complete_tail) return; - if (!any_pending) return; - var delay_interval: windows.LARGE_INTEGER = interval: { - const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER); - break :interval t.deadlineToWindowsInterval(d) catch |err| switch (err) { - error.UnsupportedClock => |e| return e, - error.Unexpected => { - recoverableOsBugDetected(); - break :interval -1; - }, - }; - }; - const alertable_syscall = try AlertableSyscall.start(); - const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval); - alertable_syscall.finish(); - switch (delay_rc) { - .SUCCESS, .TIMEOUT => { - // The thread woke due to the timeout. Although spurious - // timeouts are OK, when no deadline is passed we must not - // return `error.Timeout`. - if (timeout != .none) return error.Timeout; - }, - else => {}, + b.storage[index.toIndex()] = .{ .unused = .{ .prev = tail_index, .next = .none } }; + tail_index = index; + index = next_index; } + b.submissions = .{ .head = .none, .tail = .none }; } + if (is_windows) { + var index = b.pending.head; + while (index != .none) { + const pending = &b.storage[index.toIndex()].pending; + const context: *WindowsBatchPendingOperationContext = .fromErased(&pending.context); + _ = windows.ntdll.NtCancelIoFile(context.file, &context.iosb); + index = pending.node.next; + } + while (b.pending.head != .none) waitForApcOrAlert(); + } else if (b.context) |context| { + t.allocator.free(@as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..b.storage.len]); + b.context = null; + } + assert(b.pending.head == .none); +} + +fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void { + const b: *Io.Batch = @ptrCast(@alignCast(apc_context)); + const context: *WindowsBatchPendingOperationContext = @fieldParentPtr("iosb", iosb); + const erased_context = context.toErased(); + const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("context", erased_context); + switch (pending.node.prev) { + .none => b.pending.head = pending.node.next, + else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next, + } + switch (pending.node.next) { + .none => b.pending.tail = pending.node.prev, + else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev, + } + const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending); + const index = storage - b.storage.ptr; + switch (iosb.u.Status) { + .CANCELLED => { + const tail_index = b.unused.tail; + switch (tail_index) { + .none => b.unused.head = .fromIndex(index), + else => b.storage[tail_index.toIndex()].unused.next = .fromIndex(index), + } + storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } }; + b.unused.tail = .fromIndex(index); + }, + else => { + switch (b.completions.tail) { + .none => b.completions.head = .fromIndex(index), + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = .fromIndex(index), + } + b.completions.tail = .fromIndex(index); + const result: Io.Operation.Result = switch (pending.tag) { + .file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) }, + }; + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + }, + } +} + +fn batchAwaitWindows(b: *Io.Batch) Io.Cancelable!void { + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const storage = &b.storage[index.toIndex()]; + const submission = storage.submission; + errdefer storage.* = .{ .submission = submission }; + storage.* = .{ .pending = .{ + .node = .{ .prev = b.pending.tail, .next = .none }, + .tag = submission.operation, + .context = undefined, + } }; + const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context); + switch (submission.operation) { + .file_read_streaming => |o| { + context.file = o.file.handle; + try ntReadFile(o.file.handle, o.data, &batchApc, b, &context.iosb); + }, + } + switch (b.pending.tail) { + .none => b.pending.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].pending.node.next = index, + } + b.pending.tail = index; + index = submission.node.next; + } + b.submissions = .{ .head = .none, .tail = .none }; } fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void { @@ -8701,7 +8818,7 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingEr .u = .{ .Status = .PENDING }, .Information = 0, }; - try ntReadFile(file.handle, data, &io_status_block); + try ntReadFile(file.handle, data, &noopApc, null, &io_status_block); while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { // Once we get here we must not return from the function until the // operation completes, thereby releasing reference to io_status_block. @@ -8736,12 +8853,20 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize { } } -fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STATUS_BLOCK) Io.Cancelable!void { +fn ntReadFile( + handle: windows.HANDLE, + data: []const []u8, + apcRoutine: ?*const windows.IO_APC_ROUTINE, + apc_context: ?*anyopaque, + iosb: *windows.IO_STATUS_BLOCK, +) Io.Cancelable!void { var index: usize = 0; while (index < data.len and data[index].len == 0) index += 1; if (index == data.len) { - iosb.u.Status = .SUCCESS; - iosb.Information = 0; + iosb.* = .{ .u = .{ .Status = .SUCCESS }, .Information = 0 }; + if (apcRoutine) |routine| if (routine != &noopApc) { + _ = windows.ntdll.NtQueueApcThread(windows.current_process, routine, apc_context, iosb, null); + }; return; } const buffer = data[index]; @@ -8750,8 +8875,8 @@ fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STAT while (true) switch (windows.ntdll.NtReadFile( handle, null, // event - noopApc, // apc callback - null, // apc context + apcRoutine, + apc_context, iosb, buffer.ptr, @min(std.math.maxInt(u32), buffer.len),