diff --git a/lib/std/Io.zig b/lib/std/Io.zig index ba20eccc4e..03b33dae06 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -243,8 +243,6 @@ pub const VTable = struct { netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle, netSocketCreatePair: *const fn (?*anyopaque, net.Socket.CreatePairOptions) net.Socket.CreatePairError![2]net.Socket, netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize }, - /// Returns 0 on end of stream. - netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize, netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, netWriteFile: *const fn (?*anyopaque, net.Socket.Handle, header: []const u8, *Io.File.Reader, Io.Limit) net.Stream.Writer.WriteFileError!usize, netClose: *const fn (?*anyopaque, handle: []const net.Socket.Handle) void, @@ -261,6 +259,7 @@ pub const Operation = union(enum) { /// other systems this tag is unreachable. device_io_control: DeviceIoControl, net_receive: NetReceive, + net_read: NetRead, pub const Tag = @typeInfo(Operation).@"union".tag_type.?; @@ -386,6 +385,23 @@ pub const Operation = union(enum) { pub const Result = struct { ?net.Socket.ReceiveError, usize }; }; + pub const NetRead = struct { + socket_handle: net.Socket.Handle, + data: [][]u8, + + pub const Error = error{ + SystemResources, + ConnectionResetByPeer, + SocketUnconnected, + /// The file descriptor does not hold the required rights to read + /// from it. + AccessDenied, + NetworkDown, + } || Io.UnexpectedError; + + pub const Result = Error!usize; + }; + pub const Result = Result: { const operation_fields = @typeInfo(Operation).@"union".fields; var field_names: [operation_fields.len][]const u8 = undefined; @@ -2626,7 +2642,6 @@ pub const failing: std.Io = .{ .netConnectUnix = failingNetConnectUnix, .netSocketCreatePair = failingNetSocketCreatePair, .netSend = failingNetSend, - .netRead = failingNetRead, .netWrite = failingNetWrite, .netWriteFile = failingNetWriteFile, .netClose = unreachableNetClose, @@ -2774,6 +2789,7 @@ pub fn failingOperate(userdata: ?*anyopaque, operation: Operation) Cancelable!Op .file_write_streaming => .{ .file_write_streaming = error.InputOutput }, .device_io_control => unreachable, .net_receive => .{ .net_receive = .{ error.NetworkDown, 0 } }, + .net_read => .{ .net_read = error.NetworkDown }, }; } @@ -3377,13 +3393,6 @@ pub fn failingNetSend(userdata: ?*anyopaque, handle: net.Socket.Handle, messages return .{ error.NetworkDown, 0 }; } -pub fn failingNetRead(userdata: ?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { - _ = userdata; - _ = src; - _ = data; - return error.NetworkDown; -} - pub fn failingNetWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize { _ = userdata; _ = dest; diff --git a/lib/std/Io/Dispatch.zig b/lib/std/Io/Dispatch.zig index 259a233d57..93a1bd93db 100644 --- a/lib/std/Io/Dispatch.zig +++ b/lib/std/Io/Dispatch.zig @@ -459,7 +459,6 @@ pub fn io(ev: *Evented) Io { .netConnectUnix = netConnectUnixUnavailable, .netSocketCreatePair = netSocketCreatePairUnavailable, .netSend = netSendUnavailable, - .netRead = netReadUnavailable, .netWrite = netWriteUnavailable, .netWriteFile = netWriteFileUnavailable, .netClose = netClose, @@ -1713,6 +1712,7 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper }, .device_io_control => |*o| return .{ .device_io_control = try deviceIoControl(o) }, .net_receive => @panic("TODO implement net_receive operation"), + .net_read => @panic("TODO implement net_read operation"), } } @@ -2134,6 +2134,7 @@ fn batchDrainSubmitted( }, .device_io_control => {}, .net_receive => @panic("TODO implement batched net_receive"), + .net_read => @panic("TODO implement batched net_read"), }; if (concurrency) return error.ConcurrencyUnavailable; break :result try operate(ev, storage.submission.operation); @@ -2193,6 +2194,7 @@ fn batchSourceEvent(context: ?*anyopaque) callconv(.c) void { }, .device_io_control => unreachable, .net_receive => @panic("TODO implement batched net_receive"), + .net_read => @panic("TODO implement batched net_read"), }; switch (pending.node.prev) { @@ -4877,18 +4879,6 @@ fn netSendUnavailable( return .{ error.NetworkDown, 0 }; } -fn netReadUnavailable( - userdata: ?*anyopaque, - fd: net.Socket.Handle, - data: [][]u8, -) net.Stream.Reader.Error!usize { - const ev: *Evented = @ptrCast(@alignCast(userdata)); - _ = ev; - _ = fd; - _ = data; - return error.NetworkDown; -} - fn netWriteUnavailable( userdata: ?*anyopaque, handle: net.Socket.Handle, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index a27efafe65..5705e9122a 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1943,10 +1943,6 @@ pub fn io(t: *Threaded) Io { .windows => netShutdownWindows, else => netShutdownPosix, }, - .netRead = switch (native_os) { - .windows => netReadWindows, - else => netReadPosix, - }, .netWrite = switch (native_os) { .windows => netWriteWindows, else => netWritePosix, @@ -2566,6 +2562,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper }; break :o .{ null, 1 }; } }, + .net_read => |o| return .{ + .net_read = netRead(o.socket_handle, o.data) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| e, + }, + }, } } @@ -2621,6 +2623,14 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { }; poll_len += 1; }, + .net_read => |o| { + poll_buffer[poll_len] = .{ + .fd = o.socket_handle, + .events = posix.POLL.IN | posix.POLL.ERR, + .revents = 0, + }; + poll_len += 1; + }, } index = submission.node.next; } @@ -2798,6 +2808,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; b.completed.tail = index; }, + .net_read => |o| try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR), } index = submission.node.next; } @@ -2993,6 +3004,7 @@ fn batchApc( .file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) }, .device_io_control => .{ .device_io_control = iosb.* }, .net_receive => unreachable, + .net_read => unreachable, }; storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; }, @@ -3201,6 +3213,16 @@ fn batchDrainSubmittedWindows(t: *Threaded, b: *Io.Batch, concurrency: bool) (Io .net_receive = netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags), }); }, + .net_read => |*o| { + // TODO integrate with overlapped I/O or equivalent to avoid this error + if (concurrency) return error.ConcurrencyUnavailable; + batchCompleteBlockingWindows(b, operation_userdata, .{ + .net_read = netRead(o.socket_handle, o.data) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| e, + }, + }); + }, } index = submission.node.next; } @@ -12549,11 +12571,14 @@ fn deferAcceptAfd(t: *Threaded, listen_handle: net.Socket.Handle, info: windows. } } -fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { +fn netRead(socket_handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { if (!have_networking) return error.NetworkDown; - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; + if (is_windows) return netReadWindows(socket_handle, data); + return netReadPosix(socket_handle, data); +} + +fn netReadPosix(fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; for (data) |buf| { @@ -12590,7 +12615,6 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } @@ -12622,7 +12646,6 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.Timeout, .PIPE => return error.SocketUnconnected, .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), @@ -12632,11 +12655,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. } } -fn netReadWindows(userdata: ?*anyopaque, socket_handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { - if (!have_networking) return error.NetworkDown; - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; - +fn netReadWindows(socket_handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { var iovecs: [max_iovecs_len]windows.AFD.WSABUF(.@"var") = undefined; var len: u32 = 0; for (data) |buf| { diff --git a/lib/std/Io/Uring.zig b/lib/std/Io/Uring.zig index 759a9bac20..750465c493 100644 --- a/lib/std/Io/Uring.zig +++ b/lib/std/Io/Uring.zig @@ -779,7 +779,6 @@ pub fn io(ev: *Evented) Io { .netConnectUnix = netConnectUnixUnavailable, .netSocketCreatePair = netSocketCreatePairUnavailable, .netSend = netSendUnavailable, - .netRead = netReadUnavailable, .netWrite = netWriteUnavailable, .netWriteFile = netWriteFileUnavailable, .netClose = netClose, @@ -2105,6 +2104,12 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper }; }, }, + .net_read => |o| .{ + .net_read = r: { + _ = o; + break :r error.NetworkDown; // TODO + }, + }, }; } @@ -2392,6 +2397,10 @@ fn batchDrainSubmitted( _ = o; @panic("TODO implement batchDrainSubmitted for net_receive"); }, + .net_read => |o| { + _ = o; + @panic("TODO implement batchDrainSubmitted for net_read"); + }, })) |result| { switch (batch.completed.tail) { .none => batch.completed.head = index, @@ -2493,6 +2502,7 @@ fn batchDrainReady(batch: *Io.Batch) Io.Timeout.Error!void { }, .device_io_control => unreachable, .net_receive => @panic("TODO"), + .net_read => @panic("TODO"), })) |result| { switch (batch.completed.tail) { .none => batch.completed.head = index, @@ -5142,18 +5152,6 @@ fn netReceive( } } -fn netReadUnavailable( - userdata: ?*anyopaque, - fd: net.Socket.Handle, - data: [][]u8, -) net.Stream.Reader.Error!usize { - const ev: *Evented = @ptrCast(@alignCast(userdata)); - _ = ev; - _ = fd; - _ = data; - return error.NetworkDown; -} - fn netWriteUnavailable( userdata: ?*anyopaque, handle: net.Socket.Handle, diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index ef99c8e441..2115e73e49 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -1245,6 +1245,15 @@ pub const Stream = struct { const max_iovecs_len = 8; + /// This is a low-level API that calls the `Io` interface function directly. + /// For a higher level API, see `reader`. + pub fn read(s: *const Stream, io: Io, data: [][]u8) Reader.Error!usize { + return (try io.operate(.{ .net_read = .{ + .socket_handle = s.socket.handle, + .data = data, + } })).net_read; + } + pub fn close(s: *const Stream, io: Io) void { io.vtable.netClose(io.userdata, (&s.socket.handle)[0..1]); } @@ -1259,16 +1268,7 @@ pub const Stream = struct { stream: Stream, err: ?Error, - pub const Error = error{ - SystemResources, - ConnectionResetByPeer, - Timeout, - SocketUnconnected, - /// The file descriptor does not hold the required rights to read - /// from it. - AccessDenied, - NetworkDown, - } || Io.Cancelable || Io.UnexpectedError; + pub const Error = Io.Operation.NetRead.Error || Io.Cancelable; pub fn init(stream: Stream, io: Io, buffer: []u8) Reader { return .{ @@ -1302,7 +1302,7 @@ pub const Stream = struct { const dest_n, const data_size = try io_r.writableVector(&iovecs_buffer, data); const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); - const n = io.vtable.netRead(io.userdata, r.stream.socket.handle, dest) catch |err| { + const n = r.stream.read(io, dest) catch |err| { r.err = err; return error.ReadFailed; }; diff --git a/src/IncrementalDebugServer.zig b/src/IncrementalDebugServer.zig index 1252d74e1e..a14ba7c2b3 100644 --- a/src/IncrementalDebugServer.zig +++ b/src/IncrementalDebugServer.zig @@ -86,7 +86,6 @@ fn runServer(ids: *IncrementalDebugServer) void { error.OutOfMemory, error.Unexpected, error.SystemResources, - error.Timeout, error.NetworkDown, error.NetworkUnreachable, error.HostUnreachable,