From 8b69341271db18a03a4aa7400a4c6679c5fbf5c6 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 5 Mar 2026 16:34:16 -0800 Subject: [PATCH] std.Io.Threaded: optimize batchAwaitConcurrent for net_receive Eagerly receive messages with MSG_DONTWAIT before polling. This makes the DNS resolution use case end up doing: recvmsg (EAGAIN) poll recvmsg (success) recvmsg (success) rather than: poll recvmsg (success) poll recvmsg (success) --- lib/std/Io/Threaded.zig | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 07d963b292..e74450aa80 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -2636,8 +2636,9 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper .net_receive => |*o| return .{ .net_receive = o: { if (!have_networking) break :o .{ error.NetworkDown, 0 }; if (is_windows) break :o netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags); - netReceivePosix(o.socket_handle, &o.message_buffer[0], o.data_buffer, o.flags) catch |err| switch (err) { + netReceivePosix(o.socket_handle, &o.message_buffer[0], o.data_buffer, o.flags, false) catch |err| switch (err) { error.Canceled => |e| return e, + error.WouldBlock => unreachable, else => |e| break :o .{ e, 0 }, }; break :o .{ null, 1 }; @@ -2846,19 +2847,41 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout { var index = b.submitted.head; while (index != .none) { - const submission = &b.storage[index.toIndex()].submission; + const storage = &b.storage[index.toIndex()]; + const submission = storage.submission; switch (submission.operation) { .file_read_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.ERR), .file_write_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.OUT | posix.POLL.ERR), .device_io_control => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.OUT | posix.POLL.ERR), - .net_receive => |o| try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR), + .net_receive => |*o| nb: { + var data_i: usize = 0; + const result: Io.Operation.Result = .{ .net_receive = for (o.message_buffer, 0..) |*msg, msg_i| { + const remaining_data_buffer = o.data_buffer[data_i..]; + netReceivePosix(o.socket_handle, msg, remaining_data_buffer, o.flags, true) catch |err| switch (err) { + error.Canceled => |e| return e, + error.WouldBlock => { + if (msg_i != 0) break .{ null, msg_i }; + try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR); + break :nb; + }, + else => |e| break .{ e, 0 }, + }; + data_i += msg.data.len; + } else .{ null, o.message_buffer.len } }; + switch (b.completed.tail) { + .none => b.completed.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + b.completed.tail = index; + }, } index = submission.node.next; } } switch (poll_storage.len) { 0 => return, - 1 => if (timeout == .none) { + 1 => if (timeout == .none and b.completed.head == .none) { const index = b.submitted.head; const storage = &b.storage[index.toIndex()]; const result = try operate(t, storage.submission.operation); @@ -13221,7 +13244,8 @@ fn netReceivePosix( message: *net.IncomingMessage, data_buffer: []u8, flags: net.ReceiveFlags, -) net.Socket.ReceiveError!void { + nonblocking: bool, +) (net.Socket.ReceiveError || error{WouldBlock})!void { // recvmmsg is useless, here's why: // * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371) // * it wants iovecs for each message but we have a better API: one data @@ -13232,7 +13256,8 @@ fn netReceivePosix( @as(u32, if (flags.oob) posix.MSG.OOB else 0) | @as(u32, if (flags.peek) posix.MSG.PEEK else 0) | @as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) | - posix.MSG.NOSIGNAL; + posix.MSG.NOSIGNAL | + @as(u32, if (nonblocking) posix.MSG.DONTWAIT else 0); var storage: PosixAddress = undefined; var iov: posix.iovec = .{ .base = data_buffer.ptr, .len = data_buffer.len }; @@ -13280,7 +13305,7 @@ fn netReceivePosix( .PIPE => return syscall.fail(error.SocketUnconnected), .CONNRESET => return syscall.fail(error.ConnectionResetByPeer), .NETDOWN => return syscall.fail(error.NetworkDown), - .AGAIN => |err| return syscall.errnoBug(err), + .AGAIN => return syscall.fail(error.WouldBlock), .BADF => |err| return syscall.errnoBug(err), .FAULT => |err| return syscall.errnoBug(err), .INVAL => |err| return syscall.errnoBug(err),