std.Io.Threaded: batchWait and batchCancel for Windows

This commit is contained in:
Andrew Kelley
2026-01-26 19:07:01 -08:00
parent 351a3dd997
commit e1604c1923
3 changed files with 226 additions and 133 deletions
-1
View File
@@ -1721,7 +1721,6 @@ fn evalZigTest(
// a crash of some kind. Either way, the child will terminate by itself -- wait for it.
const stderr_reader = multi_reader.reader(1);
const stderr_owned = try arena.dupe(u8, stderr_reader.buffered());
stderr_reader.tossBuffered();
// Clean up everything and wait for the child to exit.
child.stdin.?.close(io);
+8
View File
@@ -350,6 +350,8 @@ pub const Batch = struct {
}
};
/// After calling this, it is safe to unconditionally defer a call to
/// `cancel`.
pub fn init(operations: []Operation, ring: []u32) Batch {
const len: u31 = @intCast(operations.len);
assert(ring.len == len);
@@ -408,12 +410,18 @@ pub const Batch = struct {
/// Starts work on any submitted operations and returns when at least one has completeed.
///
/// Returns `error.Timeout` if `timeout` expires first.
///
/// Depending on the `Io` implementation, may allocate resources that are
/// freed with `cancel`, even if an error is returned.
pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void {
return io.vtable.batchWait(io.userdata, b, timeout);
}
/// Returns after all `operations` have completed. Operations which have not completed
/// after this function returns were successfully dropped and had no side effects.
///
/// This function is idempotent with respect to itself and `wait`. It is
/// safe to unconditionally `defer` a call to this function after `init`.
pub fn cancel(b: *Batch, io: Io) void {
return io.vtable.batchCancel(io.userdata, b);
}
+218 -132
View File
@@ -1255,6 +1255,32 @@ const AlertableSyscall = struct {
assert(is_windows);
}
fn start() Io.Cancelable!AlertableSyscall {
const thread = Thread.current orelse return .{ .thread = null };
switch (thread.cancel_protection) {
.blocked => return .{ .thread = null },
.unblocked => {},
}
const old_status = thread.status.fetchOr(.{
.cancelation = @enumFromInt(0b010),
.awaitable = .null,
}, .monotonic);
switch (old_status.cancelation) {
.parked => unreachable,
.blocked => unreachable,
.blocked_alertable => unreachable,
.blocked_canceling => unreachable,
.blocked_alertable_canceling => unreachable,
.none => return .{ .thread = thread }, // new status is `.blocked_alertable`
.canceling => {
// Status is unchanged (still `.canceling`)---change to `.canceled` before return.
thread.status.store(.{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic);
return error.Canceled;
},
.canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged)
}
}
fn checkCancel(s: AlertableSyscall) Io.Cancelable!void {
comptime assert(is_windows);
const thread = s.thread orelse return;
@@ -2501,10 +2527,10 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
const op = ring[submit_head.index(len)];
const operation = &operations[op];
switch (operation.*) {
.noop => {
try operate(t, operation);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
@@ -2524,8 +2550,7 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
1 => if (timeout == .none) {
const op = map_buffer[0];
try operate(t, &operations[op]);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
submitComplete(ring, &complete_tail, op);
poll_i = 0;
return;
},
@@ -2560,8 +2585,7 @@ fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.
ring[submit_head.index(len)] = op;
} else {
try operate(t, &operations[op]);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
submitComplete(ring, &complete_tail, op);
}
}
return;
@@ -2584,19 +2608,49 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
switch (operations[op]) {
.noop => {
operate(t, &operations[op]) catch unreachable;
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| _ = o.status.unstarted,
}
}
if (is_windows) {
// Iterate over pending and issue cancelations, then free the allocation for IO_STATUS_BLOCK
if (b.impl.reserved) |reserved| {
const gpa = t.allocator;
const metadatas_ptr: [*]WinOpMetadata = @ptrCast(@alignCast(reserved));
const metadatas = metadatas_ptr[0..b.operations.len];
for (metadatas, 0..) |*metadata, op| {
const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING;
if (done) continue;
switch (operations[op]) {
.noop => unreachable,
.file_read_streaming => |*o| {
_ = windows.ntdll.NtCancelIoFile(o.file.handle, &metadata.iosb);
},
}
}
for (metadatas) |*metadata| {
while (@atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) == .PENDING) {
waitForApcOrAlert();
}
}
gpa.free(metadatas);
b.impl.reserved = null;
}
}
b.impl.submit_head = submit_tail;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
const WinOpMetadata = struct {
iosb: windows.IO_STATUS_BLOCK,
pending: bool,
};
fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void {
const operations = b.operations;
const len: u31 = @intCast(operations.len);
@@ -2606,16 +2660,16 @@ fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.Wa
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
var overlapped_buffer: [poll_buffer_len]windows.OVERLAPPED = undefined;
var handles_buffer: [poll_buffer_len]windows.HANDLE = undefined;
var map_buffer: [poll_buffer_len]u32 = undefined; // handles_buffer index to operations index
var buffer_i: usize = 0;
const metadatas_ptr: [*]WinOpMetadata = if (b.impl.reserved) |reserved| @ptrCast(@alignCast(reserved)) else a: {
const gpa = t.allocator;
const metadatas = gpa.alloc(WinOpMetadata, operations.len) catch return error.ConcurrencyUnavailable;
b.impl.reserved = metadatas.ptr;
@memset(metadatas, .{ .iosb = undefined, .pending = false });
break :a metadatas.ptr;
};
const metadatas = metadatas_ptr[0..operations.len];
defer {
for (map_buffer[0..buffer_i]) |op| {
submit_head = submit_head.prev(len);
ring[submit_head.index(len)] = op;
}
b.impl.submit_head = submit_head;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
@@ -2624,74 +2678,76 @@ fn batchWaitWindows(t: *Threaded, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.Wa
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
const operation = &operations[op];
const metadata = &metadatas[op];
metadata.* = .{ .iosb = undefined, .pending = false };
switch (operation.*) {
.noop => {
try operate(t, operation);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
submitComplete(ring, &complete_tail, op);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
if (handles_buffer.len - buffer_i == 0) return error.ConcurrencyUnavailable;
const overlapped = &overlapped_buffer[buffer_i];
overlapped.* = .{
.Internal = 0,
.InternalHigh = 0,
.DUMMYUNIONNAME = .{ .Pointer = null },
.hEvent = null,
};
var n: windows.DWORD = undefined;
const buf = o.data[0];
const buf_len = std.math.lossyCast(windows.DWORD, buf.len);
if (windows.kernel32.ReadFile(o.file.handle, buf.ptr, buf_len, &n, overlapped) == 0) {
@panic("TODO");
switch (try ntReadFile(o.file.handle, o.data, &metadata.iosb)) {
.status => {
o.status = .{ .result = ntReadFileResult(&metadata.iosb) };
submitComplete(ring, &complete_tail, op);
},
.pending => {
o.status = .{ .pending = b };
metadata.pending = true;
},
}
handles_buffer[buffer_i] = o.file.handle;
map_buffer[buffer_i] = op;
buffer_i += 1;
},
}
}
switch (buffer_i) {
0 => return,
1 => if (timeout == .none) {
const op = map_buffer[0];
try operate(t, &operations[op]);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
buffer_i = 0;
return;
},
else => {},
}
var delay_interval: windows.LARGE_INTEGER = timeoutToWindowsInterval(timeout);
const handles = handles_buffer[0..buffer_i];
const map = map_buffer[0..buffer_i];
const syscall: Syscall = try .start();
const index_result = windows.WaitForMultipleObjectsEx(handles, false, windows.INFINITE, true);
syscall.finish();
const index = index_result catch |err| switch (err) {
error.Unexpected => @panic("TODO"),
error.WaitAbandoned => @panic("TODO"),
error.WaitTimeOut => @panic("TODO"),
};
var n: windows.DWORD = undefined;
if (0 == windows.kernel32.GetOverlappedResult(handles[index], &overlapped_buffer[index], &n, 0)) {
switch (windows.GetLastError()) {
.BROKEN_PIPE => @panic("TODO"),
.OPERATION_ABORTED => @panic("TODO"),
else => @panic("TODO"),
while (true) {
const alertable_syscall = try AlertableSyscall.start();
const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval);
alertable_syscall.finish();
switch (delay_rc) {
.SUCCESS => {
// The thread woke due to the timeout. Although spurious
// timeouts are OK, when no deadline is passed we must not
// return `error.Timeout`.
if (timeout != .none) return error.Timeout;
},
else => {},
}
} else switch (operations[map[index]]) {
.noop => unreachable,
.file_read_streaming => |*o| {
o.status = .{ .result = n };
},
var any_done = false;
var any_pending = false;
for (metadatas, 0..) |*metadata, op_usize| {
if (!metadata.pending) continue;
any_pending = true;
const op: u31 = @intCast(op_usize);
const done = @atomicLoad(windows.NTSTATUS, &metadata.iosb.u.Status, .acquire) != .PENDING;
switch (operations[op]) {
.noop => unreachable,
.file_read_streaming => |*o| {
assert(o.status.pending == b);
if (!done) continue;
o.status = .{ .result = ntReadFileResult(&metadata.iosb) };
},
}
any_done = true;
metadata.pending = false;
submitComplete(ring, &complete_tail, op);
}
if (any_done) return;
if (!any_pending) return;
}
}
fn submitComplete(ring: []u32, complete_tail: *Io.Batch.RingIndex, op: u32) void {
const ct = complete_tail.*;
const len: u31 = @intCast(ring.len);
ring[ct.index(len)] = op;
complete_tail.* = ct.next(len);
}
const dirCreateDir = switch (native_os) {
.windows => dirCreateDirWindows,
.wasi => dirCreateDirWasi,
@@ -5529,7 +5585,7 @@ fn dirRealPathFileWindows(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8,
fn realPathWindows(h_file: windows.HANDLE, out_buffer: []u8) File.RealPathError!usize {
var wide_buf: [windows.PATH_MAX_WIDE]u16 = undefined;
// TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
// TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const wide_slice = try windows.GetFinalPathNameByHandle(h_file, .{}, &wide_buf);
@@ -8617,70 +8673,42 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
}
fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
if (index == data.len) return 0;
const buffer = data[index];
var io_status_block: windows.IO_STATUS_BLOCK = undefined;
const syscall: Syscall = try .start();
while (true) {
io_status_block.u.Status = .PENDING;
switch (windows.ntdll.NtReadFile(
file.handle,
null, // event
noopApc, // apc callback
null, // apc context
&io_status_block,
buffer.ptr,
@min(std.math.maxInt(u32), buffer.len),
null, // byte offset
null, // key
)) {
.SUCCESS, .END_OF_FILE, .PIPE_BROKEN => {
syscall.finish();
return io_status_block.Information;
},
.PENDING => break,
.CANCELLED => {
try syscall.checkCancel();
continue;
},
.INVALID_DEVICE_REQUEST => return syscall.fail(error.IsDir),
.LOCK_NOT_GRANTED => return syscall.fail(error.LockViolation),
.ACCESS_DENIED => return syscall.fail(error.AccessDenied),
.INVALID_PARAMETER => |err| return syscall.ntstatusBug(err), // streaming read of async mode file
else => |status| return syscall.unexpectedNtstatus(status),
}
}
{
// Once we get here we received PENDING so we must not return from the
// function until the operation completes.
defer while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
waitForApcOrAlert();
};
if (ntReadFile(file.handle, data, &io_status_block)) |result| switch (result) {
.status => return ntReadFileResult(&io_status_block),
.pending => {
// Once we get here we received PENDING so we must not return from the
// function until the operation completes.
defer while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
waitForApcOrAlert();
};
const alertable_syscall = syscall.toAlertable() catch |err| switch (err) {
error.Canceled => |e| {
_ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
return e;
},
};
defer alertable_syscall.finish();
waitForApcOrAlert();
while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
alertable_syscall.checkCancel() catch |err| switch (err) {
const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
error.Canceled => |e| {
_ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
return e;
},
};
defer alertable_syscall.finish();
waitForApcOrAlert();
}
}
while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
alertable_syscall.checkCancel() catch |err| switch (err) {
error.Canceled => |e| {
_ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
return e;
},
};
waitForApcOrAlert();
}
},
} else |err| return err;
return ntReadFileResult(&io_status_block);
}
fn ntReadFileResult(io_status_block: *windows.IO_STATUS_BLOCK) !usize {
switch (io_status_block.u.Status) {
.SUCCESS, .END_OF_FILE, .PIPE_BROKEN => return io_status_block.Information,
.PENDING => unreachable, // cannot return until the operation completes
.PENDING => unreachable,
.INVALID_DEVICE_REQUEST => return error.IsDir,
.LOCK_NOT_GRANTED => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
@@ -8688,6 +8716,47 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!us
}
}
fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STATUS_BLOCK) Io.Cancelable!enum { status, pending } {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
if (index == data.len) {
iosb.u.Status = .SUCCESS;
iosb.Information = 0;
return .status;
}
const buffer = data[index];
const syscall: Syscall = try .start();
while (true) {
iosb.u.Status = .PENDING;
switch (windows.ntdll.NtReadFile(
handle,
null, // event
noopApc, // apc callback
null, // apc context
iosb,
buffer.ptr,
@min(std.math.maxInt(u32), buffer.len),
null, // byte offset
null, // key
)) {
.PENDING => {
syscall.finish();
return .pending;
},
.CANCELLED => {
try syscall.checkCancel();
continue;
},
else => |status| {
syscall.finish();
iosb.u.Status = status;
return .status;
},
}
}
}
fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)");
@@ -9318,7 +9387,7 @@ fn processExecutablePath(userdata: ?*anyopaque, out_buffer: []u8) process.Execut
};
defer w.CloseHandle(h_file);
// TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
// TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const wide_slice = try w.GetFinalPathNameByHandle(h_file, .{}, &path_name_w_buf.data);
@@ -12989,7 +13058,7 @@ fn processSetCurrentDir(userdata: ?*anyopaque, dir: Dir) process.SetCurrentDirEr
if (is_windows) {
var dir_path_buffer: [windows.PATH_MAX_WIDE]u16 = undefined;
// TODO move GetFinalPathNameByHandle logic into std.Io.Threaded and add cancel checks
// TODO move GetFinalPathNameByHandle logic into Io.Threaded and add cancel checks
try Thread.checkCancel();
const dir_path = try windows.GetFinalPathNameByHandle(dir.handle, .{}, &dir_path_buffer);
const path_len_bytes = std.math.cast(u16, dir_path.len * 2) orelse return error.NameTooLong;
@@ -16657,7 +16726,7 @@ const parking_sleep = struct {
/// Spurious wakeups are possible.
///
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
comptime assert(use_parking_futex or use_parking_sleep);
switch (native_os) {
.windows => {
@@ -16713,6 +16782,23 @@ fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) err
}
}
fn timeoutToWindowsInterval(timeout: Io.Timeout) windows.LARGE_INTEGER {
switch (timeout) {
.none => {
return std.math.minInt(windows.LARGE_INTEGER); // infinite timeout
},
.deadline => |deadline| {
const nanoseconds = deadline.raw.nanoseconds;
return @intCast(@divTrunc(nanoseconds, 100));
},
.duration => |duration| {
const now_timestamp = nowWindows(duration.clock) catch unreachable;
const deadline_ns = now_timestamp.nanoseconds + duration.raw.nanoseconds;
return @intCast(@divTrunc(deadline_ns, 100));
},
}
}
const UnparkTid = switch (native_os) {
// `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles?
.windows => usize,