Io.Batch: implement alternate API

This commit is contained in:
Jacob Young
2026-01-30 01:44:07 -05:00
committed by Andrew Kelley
parent a520355e4c
commit 2674acdb77
4 changed files with 517 additions and 376 deletions
+137 -108
View File
@@ -149,8 +149,9 @@ pub const VTable = struct {
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
operate: *const fn (?*anyopaque, *Operation) Cancelable!void,
batchWait: *const fn (?*anyopaque, *Batch, Timeout) Batch.WaitError!void,
operate: *const fn (?*anyopaque, Operation) Cancelable!Operation.Result,
batchAwaitAsync: *const fn (?*anyopaque, *Batch) Batch.AwaitAsyncError!void,
batchAwaitConcurrent: *const fn (?*anyopaque, *Batch, Timeout) Batch.AwaitConcurrentError!void,
batchCancel: *const fn (?*anyopaque, *Batch) void,
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
@@ -255,19 +256,14 @@ pub const VTable = struct {
};
pub const Operation = union(enum) {
noop: Noop,
file_read_streaming: FileReadStreaming,
pub const Noop = struct {
reserved: [2]usize = .{ 0, 0 },
status: Status(void) = .{ .unstarted = {} },
};
pub const Tag = @typeInfo(Operation).@"union".tag_type.?;
/// May return 0 reads which is different than `error.EndOfStream`.
pub const FileReadStreaming = struct {
file: File,
data: []const []u8,
status: Status(Error!usize) = .{ .unstarted = {} },
pub const Error = UnendingError || error{EndOfStream};
pub const UnendingError = error{
@@ -290,19 +286,72 @@ pub const Operation = union(enum) {
/// lock.
LockViolation,
} || Io.UnexpectedError;
pub const Result = usize;
};
pub fn Status(Result: type) type {
return union {
unstarted: void,
pending: *Batch,
pub const Result = Result: {
const operation_fields = @typeInfo(Operation).@"union".fields;
var field_names: [operation_fields.len][]const u8 = undefined;
var field_types: [operation_fields.len]type = undefined;
for (operation_fields, &field_names, &field_types) |field, *field_name, *field_type| {
field_name.* = field.name;
field_type.* = field.type.Error!field.type.Result;
}
break :Result @Union(.auto, Tag, &field_names, &field_types, &@splat(.{}));
};
pub const Storage = union {
unused: List.DoubleNode,
submission: Submission,
pending: Pending,
completion: Completion,
pub const Submission = struct {
node: List.SingleNode,
operation: Operation,
};
pub const Pending = struct {
node: List.DoubleNode,
tag: Tag,
context: [3]usize,
};
pub const Completion = struct {
node: List.SingleNode,
result: Result,
};
}
};
pub const OptionalIndex = enum(u32) {
none = std.math.maxInt(u32),
_,
pub fn fromIndex(i: usize) OptionalIndex {
const oi: OptionalIndex = @enumFromInt(i);
assert(oi != .none);
return oi;
}
pub fn toIndex(oi: OptionalIndex) u32 {
assert(oi != .none);
return @intFromEnum(oi);
}
};
pub const List = struct {
head: OptionalIndex,
tail: OptionalIndex,
pub const empty: List = .{ .head = .none, .tail = .none };
pub const SingleNode = struct { next: OptionalIndex };
pub const DoubleNode = struct { prev: OptionalIndex, next: OptionalIndex };
};
};
/// Performs one `Operation`.
pub fn operate(io: Io, operation: *Operation) Cancelable!void {
pub fn operate(io: Io, operation: Operation) Cancelable!Operation.Result {
return io.vtable.operate(io.userdata, operation);
}
@@ -312,116 +361,96 @@ pub fn operate(io: Io, operation: *Operation) Cancelable!void {
/// This is a low-level abstraction based on `Operation`. For a higher
/// level API that operates on `Future`, see `Select`.
pub const Batch = struct {
operations: []Operation,
ring: [*]u32,
user: struct {
submit_tail: RingIndex,
complete_head: RingIndex,
complete_tail: RingIndex,
},
impl: struct {
submit_head: RingIndex,
submit_tail: RingIndex,
complete_tail: RingIndex,
reserved: ?*anyopaque,
},
pub const RingIndex = enum(u32) {
_,
pub fn index(ri: RingIndex, len: u31) u31 {
const i = @intFromEnum(ri);
assert(i < @as(u32, len) * 2);
return @intCast(if (i < len) i else i - len);
}
pub fn prev(ri: RingIndex, len: u31) RingIndex {
const i = @intFromEnum(ri);
const double_len = @as(u32, len) * 2;
assert(i <= double_len);
return @enumFromInt((if (i > 0) i else double_len) - 1);
}
pub fn next(ri: RingIndex, len: u31) RingIndex {
const i = @intFromEnum(ri) + 1;
const double_len = @as(u32, len) * 2;
assert(i <= double_len);
return @enumFromInt(if (i < double_len) i else 0);
}
};
storage: []Operation.Storage,
unused: Operation.List,
submissions: Operation.List,
pending: Operation.List,
completions: Operation.List,
context: ?*anyopaque,
/// After calling this, it is safe to unconditionally defer a call to
/// `cancel`.
pub fn init(operations: []Operation, ring: []u32) Batch {
const len: u31 = @intCast(operations.len);
assert(ring.len == len);
pub fn init(storage: []Operation.Storage) Batch {
var prev: Operation.OptionalIndex = .none;
for (storage, 0..) |*operation, index| {
operation.* = .{ .unused = .{ .prev = prev, .next = .fromIndex(index + 1) } };
prev = .fromIndex(index);
}
storage[storage.len - 1].unused.next = .none;
return .{
.operations = operations,
.ring = ring.ptr,
.user = .{
.submit_tail = @enumFromInt(0),
.complete_head = @enumFromInt(0),
.complete_tail = @enumFromInt(0),
},
.impl = .{
.submit_head = @enumFromInt(0),
.submit_tail = @enumFromInt(0),
.complete_tail = @enumFromInt(0),
.reserved = null,
.storage = storage,
.unused = .{
.head = .fromIndex(0),
.tail = .fromIndex(storage.len - 1),
},
.submissions = .empty,
.pending = .empty,
.completions = .empty,
.context = null,
};
}
/// Adds `b.operations[operation]` to the list of submitted operations
/// that will be performed when `wait` is called.
pub fn add(b: *Batch, operation: usize) void {
const tail = b.user.submit_tail;
const len: u31 = @intCast(b.operations.len);
b.user.submit_tail = tail.next(len);
b.ring[0..len][tail.index(len)] = @intCast(operation);
/// Adds an operation to be performed at the next await call.
/// Returns the index that will be returned by `next` after the operation completes.
/// Asserts that no more than `storage.len` operations are active at a time.
pub fn add(b: *Batch, operation: Operation) u32 {
const index = b.unused.next;
b.addAt(index.toIndex(), operation);
return index;
}
fn flush(b: *Batch) void {
@atomicStore(RingIndex, &b.impl.submit_tail, b.user.submit_tail, .release);
}
/// Returns `operation` such that `b.operations[operation]` has completed.
/// Returns `null` when `wait` should be called.
pub fn next(b: *Batch) ?u32 {
const head = b.user.complete_head;
if (head == b.user.complete_tail) {
@branchHint(.unlikely);
b.flush();
const tail = @atomicLoad(RingIndex, &b.impl.complete_tail, .acquire);
if (head == tail) {
@branchHint(.unlikely);
return null;
}
assert(head != tail);
b.user.complete_tail = tail;
/// Adds an operation to be performed at the next await call.
/// After the operation completes, `next` will return `index`.
/// Asserts that the operation at `index` is not active.
pub fn addAt(b: *Batch, index: u32, operation: Operation) void {
const storage = &b.storage[index];
const unused = storage.unused;
switch (unused.prev) {
.none => b.unused.head = .none,
else => |prev_index| b.storage[prev_index.toIndex()].unused.next = unused.next,
}
const len: u31 = @intCast(b.operations.len);
b.user.complete_head = head.next(len);
return b.ring[0..len][head.index(len)];
switch (unused.next) {
.none => b.unused.tail = .none,
else => |next_index| b.storage[next_index.toIndex()].unused.prev = unused.prev,
}
switch (b.submissions.tail) {
.none => b.submissions.head = .fromIndex(index),
else => |tail_index| b.storage[tail_index.toIndex()].submission.node.next = .fromIndex(index),
}
storage.* = .{ .submission = .{ .node = .{ .next = .none }, .operation = operation } };
b.submissions.tail = .fromIndex(index);
}
pub const WaitError = ConcurrentError || Cancelable || Timeout.Error;
pub fn next(b: *Batch) ?struct { index: u32, result: Operation.Result } {
const index = b.completions.head;
if (index == .none) return null;
const storage = &b.storage[index.toIndex()];
const completion = storage.completion;
const next_index = completion.node.next;
b.completions.head = next_index;
if (next_index == .none) b.completions.tail = .none;
/// Starts work on any submitted operations and returns when at least one has completeed.
///
/// Returns `error.Timeout` if `timeout` expires first.
///
/// Depending on the `Io` implementation, may allocate resources that are
/// freed with `cancel`, even if an error is returned.
pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void {
return io.vtable.batchWait(io.userdata, b, timeout);
const tail_index = b.unused.tail;
switch (tail_index) {
.none => b.unused.head = index,
else => b.storage[tail_index.toIndex()].unused.next = index,
}
storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } };
b.unused.tail = index;
return .{ .index = index.toIndex(), .result = completion.result };
}
pub const AwaitAsyncError = Cancelable;
pub fn awaitAsync(b: *Batch, io: Io) AwaitAsyncError!void {
return io.vtable.batchAwaitAsync(io.userdata, b);
}
pub const AwaitConcurrentError = ConcurrentError || Cancelable || Timeout.Error;
pub fn awaitConcurrent(b: *Batch, io: Io, timeout: Timeout) AwaitConcurrentError!void {
return io.vtable.batchAwaitConcurrent(io.userdata, b, timeout);
}
/// Returns after all `operations` have completed. Operations which have not completed
/// after this function returns were successfully dropped and had no side effects.
///
/// This function is idempotent with respect to itself and `wait`. It is
/// safe to unconditionally `defer` a call to this function after `init`.
pub fn cancel(b: *Batch, io: Io) void {
return io.vtable.batchCancel(io.userdata, b);
}
+3 -4
View File
@@ -559,12 +559,11 @@ pub const ReadStreamingError = error{EndOfStream} || Reader.Error;
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize {
var operation: Io.Operation = .{ .file_read_streaming = .{
const result = try io.operate(.{ .file_read_streaming = .{
.file = file,
.data = buffer,
} };
try io.operate(&operation);
return operation.file_read_streaming.status.result;
} });
return result.file_read_streaming;
}
pub const ReadPositionalError = error{
+20 -32
View File
@@ -22,8 +22,7 @@ pub const UnendingError = Allocator.Error || File.Reader.Error || Io.ConcurrentE
/// Trailing:
/// * `contexts: [len]Context`
/// * `ring: [len]u32`
/// * `operations: [len]Io.Operation`
/// * `storage: [len]Io.Operation.Storage`
pub const Streams = extern struct {
len: u32,
@@ -33,17 +32,10 @@ pub const Streams = extern struct {
return ptr[0..s.len];
}
pub fn ring(s: *Streams) []u32 {
pub fn storage(s: *Streams) []Io.Operation.Storage {
const prev = contexts(s);
const end = prev.ptr + prev.len;
const ptr: [*]u32 = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(u32)));
return ptr[0..s.len];
}
pub fn operations(s: *Streams) []Io.Operation {
const prev = ring(s);
const end = prev.ptr + prev.len;
const ptr: [*]Io.Operation = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation)));
const ptr: [*]Io.Operation.Storage = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation.Storage)));
return ptr[0..s.len];
}
};
@@ -52,8 +44,7 @@ pub fn Buffer(comptime n: usize) type {
return extern struct {
len: u32,
contexts: [n][@sizeOf(Context)]u8 align(@alignOf(Context)),
ring: [n]u32,
operations: [n][@sizeOf(Io.Operation)]u8 align(@alignOf(Io.Operation)),
storage: [n][@sizeOf(Io.Operation.Storage)]u8 align(@alignOf(Io.Operation.Storage)),
pub fn toStreams(b: *@This()) *Streams {
b.len = n;
@@ -86,25 +77,22 @@ pub fn init(mr: *MultiReader, gpa: Allocator, io: Io, streams: *Streams, files:
.vec = .{&.{}},
.err = null,
};
const operations = streams.operations();
const ring = streams.ring();
mr.* = .{
.gpa = gpa,
.streams = streams,
.batch = .init(operations, ring),
.batch = .init(streams.storage()),
};
for (operations, contexts, files, 0..) |*op, *context, file, i| {
for (contexts, 0..) |*context, i| {
const r = &context.fr.interface;
op.* = .{ .file_read_streaming = .{
.file = file,
.data = &context.vec,
} };
rebaseGrowing(mr, context, 1) catch |err| {
context.err = err;
continue;
};
context.vec[0] = r.buffer;
mr.batch.add(i);
mr.batch.addAt(@intCast(i), .{ .file_read_streaming = .{
.file = context.fr.file,
.data = &context.vec,
} });
}
}
@@ -204,7 +192,7 @@ fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void {
};
}
pub const FillError = Io.Batch.WaitError || error{
pub const FillError = Io.Batch.AwaitConcurrentError || error{
/// `fill` was called when all streams already have failed or reached the
/// end.
EndOfStream,
@@ -213,17 +201,15 @@ pub const FillError = Io.Batch.WaitError || error{
/// Wait until at least one stream receives more data.
pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillError!void {
const contexts = mr.streams.contexts();
const operations = mr.streams.operations();
const io = contexts[0].fr.io;
var any_completed = false;
try mr.batch.wait(io, timeout);
try mr.batch.awaitConcurrent(io, timeout);
while (mr.batch.next()) |i| {
while (mr.batch.next()) |operation| {
any_completed = true;
const context = &contexts[i];
const operation = &operations[i];
const n = operation.file_read_streaming.status.result catch |err| {
const context = &contexts[operation.index];
const n = operation.result.file_read_streaming catch |err| {
context.err = err;
continue;
};
@@ -237,15 +223,17 @@ pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillE
assert(r.seek == 0);
}
context.vec[0] = r.buffer[r.end..];
operation.file_read_streaming.status = .{ .unstarted = {} };
mr.batch.add(i);
mr.batch.addAt(operation.index, .{ .file_read_streaming = .{
.file = context.fr.file,
.data = &context.vec,
} });
}
if (!any_completed) return error.EndOfStream;
}
/// Wait until all streams fail or reach the end.
pub fn fillRemaining(mr: *MultiReader, timeout: Io.Timeout) Io.Batch.WaitError!void {
pub fn fillRemaining(mr: *MultiReader, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
while (fill(mr, 1, timeout)) |_| {} else |err| switch (err) {
error.EndOfStream => return,
else => |e| return e,
+357 -232
View File
@@ -1617,7 +1617,8 @@ pub fn io(t: *Threaded) Io {
.futexWake = futexWake,
.operate = operate,
.batchWait = batchWait,
.batchAwaitAsync = batchAwaitAsync,
.batchAwaitConcurrent = batchAwaitConcurrent,
.batchCancel = batchCancel,
.dirCreateDir = dirCreateDir,
@@ -1780,7 +1781,8 @@ pub fn ioBasic(t: *Threaded) Io {
.futexWake = futexWake,
.operate = operate,
.batchWait = batchWait,
.batchAwaitAsync = batchAwaitAsync,
.batchAwaitConcurrent = batchAwaitConcurrent,
.batchCancel = batchCancel,
.dirCreateDir = dirCreateDir,
@@ -2483,85 +2485,227 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
Thread.futexWake(ptr, max_waiters);
}
fn operate(userdata: ?*anyopaque, op: *Io.Operation) Io.Cancelable!void {
fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Operation.Result {
const t: *Threaded = @ptrCast(@alignCast(userdata));
switch (op.*) {
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = fileReadStreaming(t, o.file, o.data) catch |err| switch (err) {
switch (operation) {
.file_read_streaming => |o| return .{
.file_read_streaming = fileReadStreaming(t, o.file, o.data) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
} };
},
},
}
}
fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void {
fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Batch.AwaitAsyncError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) return batchWaitWindows(t, b, timeout);
if (is_windows) {
try batchAwaitWindows(b);
const alertable_syscall = try AlertableSyscall.start();
while (b.pending.head != .none and b.completions.head == .none) waitForApcOrAlert();
alertable_syscall.finish();
return;
}
if (native_os == .wasi and !builtin.link_libc) @panic("TODO");
const operations = b.operations;
const len: u31 = @intCast(operations.len);
const ring = b.ring[0..len];
var submit_head = b.impl.submit_head;
const submit_tail = b.user.submit_tail;
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index
var poll_i: u8 = 0;
defer {
for (map_buffer[0..poll_i]) |op| {
submit_head = submit_head.prev(len);
ring[submit_head.index(len)] = op;
}
b.impl.submit_head = submit_head;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
const operation = &operations[op];
switch (operation.*) {
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable;
poll_buffer[poll_i] = .{
.fd = o.file.handle,
.events = posix.POLL.IN,
.revents = 0,
};
map_buffer[poll_i] = @intCast(op);
poll_i += 1;
},
var poll_len: u32 = 0;
{
var index = b.submissions.head;
while (index != .none and poll_len < poll_buffer_len) {
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
poll_len += 1;
},
}
index = submission.node.next;
}
}
switch (poll_i) {
switch (poll_len) {
0 => return,
1 => {},
else => while (true) {
const timeout_ms: i32 = t: {
if (b.completions.head != .none) {
// It is legal to call batchWait with already completed
// operations in the ring. In such case, we need to avoid
// blocking in the poll syscall, but we can still take this
// opportunity to find additional ready operations.
break :t 0;
}
const max_poll_ms = std.math.maxInt(i32);
break :t max_poll_ms;
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
if (b.completions.head != .none) {
// Since there are already completions available in the
// queue, this is neither a timeout nor a case for
// retrying.
return;
}
continue;
}
var prev_index: Io.Operation.OptionalIndex = .none;
var index = b.submissions.head;
for (poll_buffer[0..poll_len]) |poll_entry| {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
if (poll_entry.revents != 0) {
const result = try operate(t, submission.operation);
switch (prev_index) {
.none => b.submissions.head = next_index,
else => b.storage[prev_index.toIndex()].submission.node.next = next_index,
}
if (next_index == .none) b.submissions.tail = prev_index;
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completions.tail = index;
} else prev_index = index;
index = next_index;
}
assert(index == .none);
return;
},
.INTR => continue,
else => break,
}
},
}
{
var tail_index = b.completions.tail;
defer b.completions.tail = tail_index;
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
const result = try operate(t, submission.operation);
switch (tail_index) {
.none => b.completions.head = index,
else => b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
tail_index = index;
index = next_index;
}
b.submissions = .{ .head = .none, .tail = .none };
}
}
fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) {
error.Unexpected => deadline: {
recoverableOsBugDetected();
break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake };
},
error.UnsupportedClock => |e| return e,
};
try batchAwaitWindows(b);
while (b.pending.head != .none and b.completions.head == .none) {
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
break :interval t.deadlineToWindowsInterval(d) catch |err| switch (err) {
error.UnsupportedClock => |e| return e,
error.Unexpected => {
recoverableOsBugDetected();
break :interval -1;
},
};
};
const alertable_syscall = try AlertableSyscall.start();
const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval);
alertable_syscall.finish();
switch (delay_rc) {
.SUCCESS, .TIMEOUT => {
// The thread woke due to the timeout. Although spurious
// timeouts are OK, when no deadline is passed we must not
// return `error.Timeout`.
if (timeout != .none and b.completions.head == .none) return error.Timeout;
},
else => {},
}
}
return;
}
if (native_os == .wasi and !builtin.link_libc) @panic("TODO");
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var poll_storage: struct {
gpa: std.mem.Allocator,
b: *Io.Batch,
slice: []posix.pollfd,
len: u32,
fn add(storage: *@This(), file: Io.File, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void {
const len = storage.len;
if (len == poll_buffer_len) {
const slice: []posix.pollfd = if (storage.b.context) |context|
@as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..storage.b.storage.len]
else allocation: {
const allocation = storage.gpa.alloc(posix.pollfd, storage.b.storage.len) catch
return error.ConcurrencyUnavailable;
storage.b.context = allocation.ptr;
break :allocation allocation;
};
@memcpy(slice[0..poll_buffer_len], storage.slice);
}
storage.slice[len] = .{
.fd = file.handle,
.events = events,
.revents = 0,
};
storage.len = len + 1;
}
} = .{ .gpa = t.allocator, .b = b, .slice = &poll_buffer, .len = 0 };
{
var index = b.submissions.head;
while (index != .none) {
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN),
}
index = submission.node.next;
}
}
switch (poll_storage.len) {
0 => return,
1 => if (timeout == .none) {
const op = map_buffer[0];
try operate(t, &operations[op]);
submitComplete(ring, &complete_tail, op);
poll_i = 0;
const index = b.submissions.head;
const storage = &b.storage[index.toIndex()];
const result = try operate(t, storage.submission.operation);
b.submissions = .{ .head = .none, .tail = .none };
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completions.tail = index;
return;
},
else => {},
}
const t_io = ioBasic(t);
const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock;
const max_poll_ms = std.math.maxInt(i32);
while (true) {
const timeout_ms: i32 = t: {
if (b.user.complete_head != complete_tail) {
if (b.completions.head != .none) {
// It is legal to call batchWait with already completed
// operations in the ring. In such case, we need to avoid
// blocking in the poll syscall, but we can still take this
@@ -2571,15 +2715,16 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
const d = deadline orelse break :t -1;
const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock;
if (duration.raw.nanoseconds <= 0) return error.Timeout;
const max_poll_ms = std.math.maxInt(i32);
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_i, timeout_ms);
const rc = posix.system.poll(&poll_buffer, poll_storage.len, timeout_ms);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
if (b.user.complete_head != complete_tail) {
if (b.completions.head != .none) {
// Since there are already completions available in the
// queue, this is neither a timeout nor a case for
// retrying.
@@ -2590,18 +2735,30 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
if (deadline == null) continue;
return error.Timeout;
}
while (poll_i != 0) {
poll_i -= 1;
const poll_fd = &poll_buffer[poll_i];
const op = map_buffer[poll_i];
if (poll_fd.revents == 0) {
submit_head = submit_head.prev(len);
ring[submit_head.index(len)] = op;
} else {
try operate(t, &operations[op]);
submitComplete(ring, &complete_tail, op);
}
var prev_index: Io.Operation.OptionalIndex = .none;
var index = b.submissions.head;
for (poll_storage.slice[0..poll_storage.len]) |poll_entry| {
const submission = &b.storage[index.toIndex()].submission;
const next_index = submission.node.next;
if (poll_entry.revents != 0) {
const result = try operate(t, submission.operation);
switch (prev_index) {
.none => b.submissions.head = next_index,
else => b.storage[prev_index.toIndex()].submission.node.next = next_index,
}
if (next_index == .none) b.submissions.tail = prev_index;
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
b.completions.tail = index;
b.storage[index.toIndex()] = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
} else prev_index = index;
index = next_index;
}
assert(index == .none);
return;
},
.INTR => continue,
@@ -2610,166 +2767,126 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
}
}
fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const operations = b.operations;
const len: u31 = @intCast(operations.len);
const ring = b.ring[0..len];
var submit_head = b.impl.submit_head;
const submit_tail = b.user.submit_tail;
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
switch (operations[op]) {
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| _ = o.status.unstarted,
}
}
if (is_windows) {
// Iterate over pending and issue cancelations, then free the allocation for IO_STATUS_BLOCK
if (b.impl.reserved) |reserved| {
const gpa = t.allocator;
const metadatas_ptr: [*]WinOpMetadata = @ptrCast(@alignCast(reserved));
const metadatas = metadatas_ptr[0..b.operations.len];
for (metadatas, 0..) |*metadata, op| {
if (!metadata.pending) continue;
const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING;
if (done) continue;
switch (operations[op]) {
.noop => unreachable,
.file_read_streaming => |*o| {
_ = windows.ntdll.NtCancelIoFile(o.file.handle, &metadata.iosb);
},
}
}
for (metadatas) |*metadata| {
if (!metadata.pending) continue;
while (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) {
waitForApcOrAlert();
}
}
gpa.free(metadatas);
b.impl.reserved = null;
}
}
b.impl.submit_head = submit_tail;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
const WinOpMetadata = struct {
const WindowsBatchPendingOperationContext = extern struct {
file: windows.HANDLE,
iosb: windows.IO_STATUS_BLOCK,
pending: bool,
const Erased = [3]usize;
comptime {
assert(@sizeOf(Erased) <= @sizeOf(WindowsBatchPendingOperationContext));
}
fn toErased(context: *WindowsBatchPendingOperationContext) *Erased {
return @ptrCast(context);
}
fn fromErased(erased: *Erased) *WindowsBatchPendingOperationContext {
return @ptrCast(erased);
}
};
fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void {
const operations = b.operations;
const len: u31 = @intCast(operations.len);
const ring = b.ring[0..len];
var submit_head = b.impl.submit_head;
const submit_tail = b.user.submit_tail;
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
const metadatas_ptr: [*]WinOpMetadata = if (b.impl.reserved) |reserved| @ptrCast(@alignCast(reserved)) else a: {
const gpa = t.allocator;
const metadatas = gpa.alloc(WinOpMetadata, operations.len) catch return error.ConcurrencyUnavailable;
b.impl.reserved = metadatas.ptr;
@memset(metadatas, .{ .iosb = undefined, .pending = false });
break :a metadatas.ptr;
};
const metadatas = metadatas_ptr[0..operations.len];
defer {
b.impl.submit_head = submit_head;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
const operation = &operations[op];
const metadata = &metadatas[op];
metadata.* = .{ .iosb = .{
.u = .{ .Status = .PENDING },
.Information = 0,
}, .pending = false };
switch (operation.*) {
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
try ntReadFile(o.file.handle, o.data, &metadata.iosb);
if (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) {
o.status = .{ .pending = b };
metadata.pending = true;
} else {
o.status = .{ .result = ntReadFileResult(&metadata.iosb) };
submitComplete(ring, &complete_tail, op);
}
},
}
}
const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) {
error.Unexpected => deadline: {
recoverableOsBugDetected();
break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake };
},
error.UnsupportedClock => |e| return e,
};
while (true) {
var any_pending = false;
for (metadatas, 0..) |*metadata, op_usize| {
if (!metadata.pending) continue;
any_pending = true;
const op: u31 = @intCast(op_usize);
const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING;
switch (operations[op]) {
.noop => unreachable,
.file_read_streaming => |*o| {
assert(o.status.pending == b);
if (!done) continue;
o.status = .{ .result = ntReadFileResult(&metadata.iosb) };
},
fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
{
var tail_index = b.unused.tail;
defer b.unused.tail = tail_index;
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const next_index = b.storage[index.toIndex()].submission.node.next;
switch (tail_index) {
.none => b.unused.head = index,
else => b.storage[tail_index.toIndex()].unused.next = index,
}
metadata.pending = false;
submitComplete(ring, &complete_tail, op);
}
if (b.user.complete_head != complete_tail) return;
if (!any_pending) return;
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
break :interval t.deadlineToWindowsInterval(d) catch |err| switch (err) {
error.UnsupportedClock => |e| return e,
error.Unexpected => {
recoverableOsBugDetected();
break :interval -1;
},
};
};
const alertable_syscall = try AlertableSyscall.start();
const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval);
alertable_syscall.finish();
switch (delay_rc) {
.SUCCESS, .TIMEOUT => {
// The thread woke due to the timeout. Although spurious
// timeouts are OK, when no deadline is passed we must not
// return `error.Timeout`.
if (timeout != .none) return error.Timeout;
},
else => {},
b.storage[index.toIndex()] = .{ .unused = .{ .prev = tail_index, .next = .none } };
tail_index = index;
index = next_index;
}
b.submissions = .{ .head = .none, .tail = .none };
}
if (is_windows) {
var index = b.pending.head;
while (index != .none) {
const pending = &b.storage[index.toIndex()].pending;
const context: *WindowsBatchPendingOperationContext = .fromErased(&pending.context);
_ = windows.ntdll.NtCancelIoFile(context.file, &context.iosb);
index = pending.node.next;
}
while (b.pending.head != .none) waitForApcOrAlert();
} else if (b.context) |context| {
t.allocator.free(@as([*]posix.pollfd, @ptrCast(@alignCast(context)))[0..b.storage.len]);
b.context = null;
}
assert(b.pending.head == .none);
}
fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void {
const b: *Io.Batch = @ptrCast(@alignCast(apc_context));
const context: *WindowsBatchPendingOperationContext = @fieldParentPtr("iosb", iosb);
const erased_context = context.toErased();
const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("context", erased_context);
switch (pending.node.prev) {
.none => b.pending.head = pending.node.next,
else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next,
}
switch (pending.node.next) {
.none => b.pending.tail = pending.node.prev,
else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev,
}
const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending);
const index = storage - b.storage.ptr;
switch (iosb.u.Status) {
.CANCELLED => {
const tail_index = b.unused.tail;
switch (tail_index) {
.none => b.unused.head = .fromIndex(index),
else => b.storage[tail_index.toIndex()].unused.next = .fromIndex(index),
}
storage.* = .{ .unused = .{ .prev = tail_index, .next = .none } };
b.unused.tail = .fromIndex(index);
},
else => {
switch (b.completions.tail) {
.none => b.completions.head = .fromIndex(index),
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = .fromIndex(index),
}
b.completions.tail = .fromIndex(index);
const result: Io.Operation.Result = switch (pending.tag) {
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
}
}
fn batchAwaitWindows(b: *Io.Batch) Io.Cancelable!void {
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = storage.submission;
errdefer storage.* = .{ .submission = submission };
storage.* = .{ .pending = .{
.node = .{ .prev = b.pending.tail, .next = .none },
.tag = submission.operation,
.context = undefined,
} };
const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context);
switch (submission.operation) {
.file_read_streaming => |o| {
context.file = o.file.handle;
try ntReadFile(o.file.handle, o.data, &batchApc, b, &context.iosb);
},
}
switch (b.pending.tail) {
.none => b.pending.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].pending.node.next = index,
}
b.pending.tail = index;
index = submission.node.next;
}
b.submissions = .{ .head = .none, .tail = .none };
}
fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void {
@@ -8701,7 +8818,7 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingEr
.u = .{ .Status = .PENDING },
.Information = 0,
};
try ntReadFile(file.handle, data, &io_status_block);
try ntReadFile(file.handle, data, &noopApc, null, &io_status_block);
while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
// Once we get here we must not return from the function until the
// operation completes, thereby releasing reference to io_status_block.
@@ -8736,12 +8853,20 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
}
}
fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STATUS_BLOCK) Io.Cancelable!void {
fn ntReadFile(
handle: windows.HANDLE,
data: []const []u8,
apcRoutine: ?*const windows.IO_APC_ROUTINE,
apc_context: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
) Io.Cancelable!void {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
if (index == data.len) {
iosb.u.Status = .SUCCESS;
iosb.Information = 0;
iosb.* = .{ .u = .{ .Status = .SUCCESS }, .Information = 0 };
if (apcRoutine) |routine| if (routine != &noopApc) {
_ = windows.ntdll.NtQueueApcThread(windows.current_process, routine, apc_context, iosb, null);
};
return;
}
const buffer = data[index];
@@ -8750,8 +8875,8 @@ fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STAT
while (true) switch (windows.ntdll.NtReadFile(
handle,
null, // event
noopApc, // apc callback
null, // apc context
apcRoutine,
apc_context,
iosb,
buffer.ptr,
@min(std.math.maxInt(u32), buffer.len),