std.Io: move netRead to become an Operation

This commit is contained in:
xeondev
2026-04-16 15:11:25 +03:00
committed by Andrew Kelley
parent bea4ea5ff8
commit 2b48f559f4
6 changed files with 77 additions and 62 deletions
+19 -10
View File
@@ -243,8 +243,6 @@ pub const VTable = struct {
netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
netSocketCreatePair: *const fn (?*anyopaque, net.Socket.CreatePairOptions) net.Socket.CreatePairError![2]net.Socket,
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
/// Returns 0 on end of stream.
netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
netWriteFile: *const fn (?*anyopaque, net.Socket.Handle, header: []const u8, *Io.File.Reader, Io.Limit) net.Stream.Writer.WriteFileError!usize,
netClose: *const fn (?*anyopaque, handle: []const net.Socket.Handle) void,
@@ -261,6 +259,7 @@ pub const Operation = union(enum) {
/// other systems this tag is unreachable.
device_io_control: DeviceIoControl,
net_receive: NetReceive,
net_read: NetRead,
pub const Tag = @typeInfo(Operation).@"union".tag_type.?;
@@ -386,6 +385,23 @@ pub const Operation = union(enum) {
pub const Result = struct { ?net.Socket.ReceiveError, usize };
};
pub const NetRead = struct {
socket_handle: net.Socket.Handle,
data: [][]u8,
pub const Error = error{
SystemResources,
ConnectionResetByPeer,
SocketUnconnected,
/// The file descriptor does not hold the required rights to read
/// from it.
AccessDenied,
NetworkDown,
} || Io.UnexpectedError;
pub const Result = Error!usize;
};
pub const Result = Result: {
const operation_fields = @typeInfo(Operation).@"union".fields;
var field_names: [operation_fields.len][]const u8 = undefined;
@@ -2626,7 +2642,6 @@ pub const failing: std.Io = .{
.netConnectUnix = failingNetConnectUnix,
.netSocketCreatePair = failingNetSocketCreatePair,
.netSend = failingNetSend,
.netRead = failingNetRead,
.netWrite = failingNetWrite,
.netWriteFile = failingNetWriteFile,
.netClose = unreachableNetClose,
@@ -2774,6 +2789,7 @@ pub fn failingOperate(userdata: ?*anyopaque, operation: Operation) Cancelable!Op
.file_write_streaming => .{ .file_write_streaming = error.InputOutput },
.device_io_control => unreachable,
.net_receive => .{ .net_receive = .{ error.NetworkDown, 0 } },
.net_read => .{ .net_read = error.NetworkDown },
};
}
@@ -3377,13 +3393,6 @@ pub fn failingNetSend(userdata: ?*anyopaque, handle: net.Socket.Handle, messages
return .{ error.NetworkDown, 0 };
}
pub fn failingNetRead(userdata: ?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
_ = userdata;
_ = src;
_ = data;
return error.NetworkDown;
}
pub fn failingNetWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize {
_ = userdata;
_ = dest;
+3 -13
View File
@@ -459,7 +459,6 @@ pub fn io(ev: *Evented) Io {
.netConnectUnix = netConnectUnixUnavailable,
.netSocketCreatePair = netSocketCreatePairUnavailable,
.netSend = netSendUnavailable,
.netRead = netReadUnavailable,
.netWrite = netWriteUnavailable,
.netWriteFile = netWriteFileUnavailable,
.netClose = netClose,
@@ -1713,6 +1712,7 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
},
.device_io_control => |*o| return .{ .device_io_control = try deviceIoControl(o) },
.net_receive => @panic("TODO implement net_receive operation"),
.net_read => @panic("TODO implement net_read operation"),
}
}
@@ -2134,6 +2134,7 @@ fn batchDrainSubmitted(
},
.device_io_control => {},
.net_receive => @panic("TODO implement batched net_receive"),
.net_read => @panic("TODO implement batched net_read"),
};
if (concurrency) return error.ConcurrencyUnavailable;
break :result try operate(ev, storage.submission.operation);
@@ -2193,6 +2194,7 @@ fn batchSourceEvent(context: ?*anyopaque) callconv(.c) void {
},
.device_io_control => unreachable,
.net_receive => @panic("TODO implement batched net_receive"),
.net_read => @panic("TODO implement batched net_read"),
};
switch (pending.node.prev) {
@@ -4877,18 +4879,6 @@ fn netSendUnavailable(
return .{ error.NetworkDown, 0 };
}
fn netReadUnavailable(
userdata: ?*anyopaque,
fd: net.Socket.Handle,
data: [][]u8,
) net.Stream.Reader.Error!usize {
const ev: *Evented = @ptrCast(@alignCast(userdata));
_ = ev;
_ = fd;
_ = data;
return error.NetworkDown;
}
fn netWriteUnavailable(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
+33 -14
View File
@@ -1943,10 +1943,6 @@ pub fn io(t: *Threaded) Io {
.windows => netShutdownWindows,
else => netShutdownPosix,
},
.netRead = switch (native_os) {
.windows => netReadWindows,
else => netReadPosix,
},
.netWrite = switch (native_os) {
.windows => netWriteWindows,
else => netWritePosix,
@@ -2566,6 +2562,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
};
break :o .{ null, 1 };
} },
.net_read => |o| return .{
.net_read = netRead(o.socket_handle, o.data) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
},
},
}
}
@@ -2621,6 +2623,14 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
};
poll_len += 1;
},
.net_read => |o| {
poll_buffer[poll_len] = .{
.fd = o.socket_handle,
.events = posix.POLL.IN | posix.POLL.ERR,
.revents = 0,
};
poll_len += 1;
},
}
index = submission.node.next;
}
@@ -2798,6 +2808,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completed.tail = index;
},
.net_read => |o| try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR),
}
index = submission.node.next;
}
@@ -2993,6 +3004,7 @@ fn batchApc(
.file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) },
.device_io_control => .{ .device_io_control = iosb.* },
.net_receive => unreachable,
.net_read => unreachable,
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
@@ -3201,6 +3213,16 @@ fn batchDrainSubmittedWindows(t: *Threaded, b: *Io.Batch, concurrency: bool) (Io
.net_receive = netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags),
});
},
.net_read => |*o| {
// TODO integrate with overlapped I/O or equivalent to avoid this error
if (concurrency) return error.ConcurrencyUnavailable;
batchCompleteBlockingWindows(b, operation_userdata, .{
.net_read = netRead(o.socket_handle, o.data) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| e,
},
});
},
}
index = submission.node.next;
}
@@ -12549,11 +12571,14 @@ fn deferAcceptAfd(t: *Threaded, listen_handle: net.Socket.Handle, info: windows.
}
}
fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
fn netRead(socket_handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
if (!have_networking) return error.NetworkDown;
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
if (is_windows) return netReadWindows(socket_handle, data);
return netReadPosix(socket_handle, data);
}
fn netReadPosix(fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
@@ -12590,7 +12615,6 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
@@ -12622,7 +12646,6 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.PIPE => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
@@ -12632,11 +12655,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
}
}
fn netReadWindows(userdata: ?*anyopaque, socket_handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
if (!have_networking) return error.NetworkDown;
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
fn netReadWindows(socket_handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
var iovecs: [max_iovecs_len]windows.AFD.WSABUF(.@"var") = undefined;
var len: u32 = 0;
for (data) |buf| {
+11 -13
View File
@@ -779,7 +779,6 @@ pub fn io(ev: *Evented) Io {
.netConnectUnix = netConnectUnixUnavailable,
.netSocketCreatePair = netSocketCreatePairUnavailable,
.netSend = netSendUnavailable,
.netRead = netReadUnavailable,
.netWrite = netWriteUnavailable,
.netWriteFile = netWriteFileUnavailable,
.netClose = netClose,
@@ -2105,6 +2104,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
};
},
},
.net_read => |o| .{
.net_read = r: {
_ = o;
break :r error.NetworkDown; // TODO
},
},
};
}
@@ -2392,6 +2397,10 @@ fn batchDrainSubmitted(
_ = o;
@panic("TODO implement batchDrainSubmitted for net_receive");
},
.net_read => |o| {
_ = o;
@panic("TODO implement batchDrainSubmitted for net_read");
},
})) |result| {
switch (batch.completed.tail) {
.none => batch.completed.head = index,
@@ -2493,6 +2502,7 @@ fn batchDrainReady(batch: *Io.Batch) Io.Timeout.Error!void {
},
.device_io_control => unreachable,
.net_receive => @panic("TODO"),
.net_read => @panic("TODO"),
})) |result| {
switch (batch.completed.tail) {
.none => batch.completed.head = index,
@@ -5142,18 +5152,6 @@ fn netReceive(
}
}
fn netReadUnavailable(
userdata: ?*anyopaque,
fd: net.Socket.Handle,
data: [][]u8,
) net.Stream.Reader.Error!usize {
const ev: *Evented = @ptrCast(@alignCast(userdata));
_ = ev;
_ = fd;
_ = data;
return error.NetworkDown;
}
fn netWriteUnavailable(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
+11 -11
View File
@@ -1245,6 +1245,15 @@ pub const Stream = struct {
const max_iovecs_len = 8;
/// This is a low-level API that calls the `Io` interface function directly.
/// For a higher level API, see `reader`.
pub fn read(s: *const Stream, io: Io, data: [][]u8) Reader.Error!usize {
return (try io.operate(.{ .net_read = .{
.socket_handle = s.socket.handle,
.data = data,
} })).net_read;
}
pub fn close(s: *const Stream, io: Io) void {
io.vtable.netClose(io.userdata, (&s.socket.handle)[0..1]);
}
@@ -1259,16 +1268,7 @@ pub const Stream = struct {
stream: Stream,
err: ?Error,
pub const Error = error{
SystemResources,
ConnectionResetByPeer,
Timeout,
SocketUnconnected,
/// The file descriptor does not hold the required rights to read
/// from it.
AccessDenied,
NetworkDown,
} || Io.Cancelable || Io.UnexpectedError;
pub const Error = Io.Operation.NetRead.Error || Io.Cancelable;
pub fn init(stream: Stream, io: Io, buffer: []u8) Reader {
return .{
@@ -1302,7 +1302,7 @@ pub const Stream = struct {
const dest_n, const data_size = try io_r.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.netRead(io.userdata, r.stream.socket.handle, dest) catch |err| {
const n = r.stream.read(io, dest) catch |err| {
r.err = err;
return error.ReadFailed;
};
-1
View File
@@ -86,7 +86,6 @@ fn runServer(ids: *IncrementalDebugServer) void {
error.OutOfMemory,
error.Unexpected,
error.SystemResources,
error.Timeout,
error.NetworkDown,
error.NetworkUnreachable,
error.HostUnreachable,