mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-04-27 19:09:47 +03:00
std.Io.Threaded: optimize batchAwaitConcurrent for net_receive
Eagerly receive messages with MSG_DONTWAIT before polling. This makes the DNS resolution use case end up doing: recvmsg (EAGAIN) poll recvmsg (success) recvmsg (success) rather than: poll recvmsg (success) poll recvmsg (success)
This commit is contained in:
+32
-7
@@ -2636,8 +2636,9 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
|
||||
.net_receive => |*o| return .{ .net_receive = o: {
|
||||
if (!have_networking) break :o .{ error.NetworkDown, 0 };
|
||||
if (is_windows) break :o netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags);
|
||||
netReceivePosix(o.socket_handle, &o.message_buffer[0], o.data_buffer, o.flags) catch |err| switch (err) {
|
||||
netReceivePosix(o.socket_handle, &o.message_buffer[0], o.data_buffer, o.flags, false) catch |err| switch (err) {
|
||||
error.Canceled => |e| return e,
|
||||
error.WouldBlock => unreachable,
|
||||
else => |e| break :o .{ e, 0 },
|
||||
};
|
||||
break :o .{ null, 1 };
|
||||
@@ -2846,19 +2847,41 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
|
||||
{
|
||||
var index = b.submitted.head;
|
||||
while (index != .none) {
|
||||
const submission = &b.storage[index.toIndex()].submission;
|
||||
const storage = &b.storage[index.toIndex()];
|
||||
const submission = storage.submission;
|
||||
switch (submission.operation) {
|
||||
.file_read_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.ERR),
|
||||
.file_write_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.OUT | posix.POLL.ERR),
|
||||
.device_io_control => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.OUT | posix.POLL.ERR),
|
||||
.net_receive => |o| try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR),
|
||||
.net_receive => |*o| nb: {
|
||||
var data_i: usize = 0;
|
||||
const result: Io.Operation.Result = .{ .net_receive = for (o.message_buffer, 0..) |*msg, msg_i| {
|
||||
const remaining_data_buffer = o.data_buffer[data_i..];
|
||||
netReceivePosix(o.socket_handle, msg, remaining_data_buffer, o.flags, true) catch |err| switch (err) {
|
||||
error.Canceled => |e| return e,
|
||||
error.WouldBlock => {
|
||||
if (msg_i != 0) break .{ null, msg_i };
|
||||
try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR);
|
||||
break :nb;
|
||||
},
|
||||
else => |e| break .{ e, 0 },
|
||||
};
|
||||
data_i += msg.data.len;
|
||||
} else .{ null, o.message_buffer.len } };
|
||||
switch (b.completed.tail) {
|
||||
.none => b.completed.head = index,
|
||||
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
|
||||
}
|
||||
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
|
||||
b.completed.tail = index;
|
||||
},
|
||||
}
|
||||
index = submission.node.next;
|
||||
}
|
||||
}
|
||||
switch (poll_storage.len) {
|
||||
0 => return,
|
||||
1 => if (timeout == .none) {
|
||||
1 => if (timeout == .none and b.completed.head == .none) {
|
||||
const index = b.submitted.head;
|
||||
const storage = &b.storage[index.toIndex()];
|
||||
const result = try operate(t, storage.submission.operation);
|
||||
@@ -13221,7 +13244,8 @@ fn netReceivePosix(
|
||||
message: *net.IncomingMessage,
|
||||
data_buffer: []u8,
|
||||
flags: net.ReceiveFlags,
|
||||
) net.Socket.ReceiveError!void {
|
||||
nonblocking: bool,
|
||||
) (net.Socket.ReceiveError || error{WouldBlock})!void {
|
||||
// recvmmsg is useless, here's why:
|
||||
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
|
||||
// * it wants iovecs for each message but we have a better API: one data
|
||||
@@ -13232,7 +13256,8 @@ fn netReceivePosix(
|
||||
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
|
||||
@as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
|
||||
@as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) |
|
||||
posix.MSG.NOSIGNAL;
|
||||
posix.MSG.NOSIGNAL |
|
||||
@as(u32, if (nonblocking) posix.MSG.DONTWAIT else 0);
|
||||
|
||||
var storage: PosixAddress = undefined;
|
||||
var iov: posix.iovec = .{ .base = data_buffer.ptr, .len = data_buffer.len };
|
||||
@@ -13280,7 +13305,7 @@ fn netReceivePosix(
|
||||
.PIPE => return syscall.fail(error.SocketUnconnected),
|
||||
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
|
||||
.NETDOWN => return syscall.fail(error.NetworkDown),
|
||||
.AGAIN => |err| return syscall.errnoBug(err),
|
||||
.AGAIN => return syscall.fail(error.WouldBlock),
|
||||
.BADF => |err| return syscall.errnoBug(err),
|
||||
.FAULT => |err| return syscall.errnoBug(err),
|
||||
.INVAL => |err| return syscall.errnoBug(err),
|
||||
|
||||
Reference in New Issue
Block a user