mirror of
https://github.com/rust-lang/rust.git
synced 2026-06-01 14:10:03 +03:00
Merge pull request #5009 from WhySoBad/network-socket-fix-shutdown
Remove `TcpStream::peer_addr` check for network sockets, test epoll readiness for `shutdown`
This commit is contained in:
@@ -606,11 +606,8 @@ fn connect(
|
||||
),
|
||||
}
|
||||
|
||||
// Mio returns a potentially unconnected stream.
|
||||
// We can be ensured that the connection is established when
|
||||
// [`TcpStream::take_err`] and [`TcpStream::peer_addr`] both
|
||||
// don't return an error after receiving an [`Interest::WRITEABLE`]
|
||||
// event on the stream.
|
||||
// This begins establishing the connection, but does not block until the stream is fully connected.
|
||||
// We deal with that below.
|
||||
match TcpStream::connect(address) {
|
||||
Ok(stream) => {
|
||||
*socket.state.borrow_mut() = SocketState::Connecting(stream);
|
||||
@@ -1262,22 +1259,45 @@ fn shutdown(&mut self, socket: &OpTy<'tcx>, how: &OpTy<'tcx>) -> InterpResult<'t
|
||||
return this.set_last_error_and_return_i32(LibcError("ENOTCONN"));
|
||||
};
|
||||
|
||||
let shut_rd = this.eval_libc_i32("SHUT_RD");
|
||||
let shut_wr = this.eval_libc_i32("SHUT_WR");
|
||||
let shut_rdwr = this.eval_libc_i32("SHUT_RDWR");
|
||||
let is_read_shutdown = how == this.eval_libc_i32("SHUT_RD");
|
||||
let is_write_shutdown = how == this.eval_libc_i32("SHUT_WR");
|
||||
let is_read_write_shutdown = how == this.eval_libc_i32("SHUT_RDWR");
|
||||
|
||||
let how = match () {
|
||||
_ if how == shut_rd => Shutdown::Read,
|
||||
_ if how == shut_wr => Shutdown::Write,
|
||||
_ if how == shut_rdwr => Shutdown::Both,
|
||||
_ if is_read_shutdown => Shutdown::Read,
|
||||
_ if is_write_shutdown => Shutdown::Write,
|
||||
_ if is_read_write_shutdown => Shutdown::Both,
|
||||
// An invalid value was passed to `how`.
|
||||
_ => return this.set_last_error_and_return_i32(LibcError("EINVAL")),
|
||||
};
|
||||
|
||||
match stream.shutdown(how) {
|
||||
Ok(_) => interp_ok(Scalar::from_i32(0)),
|
||||
Err(e) => this.set_last_error_and_return_i32(e),
|
||||
}
|
||||
if let Err(e) = stream.shutdown(how) {
|
||||
return this.set_last_error_and_return_i32(e);
|
||||
};
|
||||
|
||||
drop(state);
|
||||
|
||||
// Because we map cross platform mio readiness to epoll readiness and
|
||||
// the different platforms don't treat `shutdown` the same way, we set
|
||||
// the readiness after a `shutdown` manually to achieve more consistent
|
||||
// epoll readiness. Otherwise we do not generate enough epoll events
|
||||
// on partial shutdowns on Windows hosts.
|
||||
let mut readiness = socket.io_readiness.borrow_mut();
|
||||
// Closing the read end of a socket causes an EPOLLRDHUP event.
|
||||
readiness.read_closed |= is_read_shutdown || is_read_write_shutdown;
|
||||
// Only shutting down the write end doesn't cause an EPOLLHUP event
|
||||
// and thus we won't set the `write_closed` readiness for it here.
|
||||
readiness.write_closed |= is_read_write_shutdown;
|
||||
// The Linux kernel also sets EPOLLIN when both ends of a socket are closed:
|
||||
// <https://github.com/torvalds/linux/blob/HEAD/net/ipv4/tcp.c#L584-L588>
|
||||
readiness.readable |= is_read_write_shutdown;
|
||||
|
||||
drop(readiness);
|
||||
|
||||
// Update the epoll readiness for the socket.
|
||||
this.update_epoll_active_events(socket, /* force_edge */ false)?;
|
||||
|
||||
interp_ok(Scalar::from_i32(0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1658,24 +1678,20 @@ fn ensure_connected(
|
||||
return action.call(this, Err(()))
|
||||
}
|
||||
|
||||
// There was no error during connecting. We still need to ensure that
|
||||
// the wakeup wasn't spurious. We do this by attempting to read the
|
||||
// peer address of the socket (following the advice given by mio):
|
||||
// There was no error during connecting. Mio advises also reading the peer address
|
||||
// to ensure that socket is actually connected and that it wasn't a spurious wake-up:
|
||||
// <https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#notes>
|
||||
|
||||
match stream.peer_addr() {
|
||||
Ok(_) => { /* fall-through to below */},
|
||||
Err(e) if matches!(e.kind(), io::ErrorKind::NotConnected | io::ErrorKind::InProgress) => {
|
||||
// We received a spurious wakeup from the OS. This should be considered an OS bug:
|
||||
// <https://github.com/tokio-rs/mio/issues/1942#issuecomment-4169378308>
|
||||
panic!("{foreign_name}: received writable event from OS but socket is not yet connected")
|
||||
},
|
||||
Err(_) => {
|
||||
// For all other errors the socket is connected. Since we're not interested in the
|
||||
// peer address and only want to know whether the socket is connected, we can ignore
|
||||
// the error and continue.
|
||||
}
|
||||
}
|
||||
//
|
||||
// Attempting to read the peer address would introduce an edge-case where the
|
||||
// write end of the socket could already be shutdown before it received a
|
||||
// writable event. When we then call [`TcpStream::peer_addr`] we receive an
|
||||
// error. This would need extra state for storing whether the write end was
|
||||
// manually closed using `shutdown`.
|
||||
// Also, tokio doesn't read the peer address and everything seems to be fine,
|
||||
// so we don't do that either:
|
||||
// <https://github.com/tokio-rs/mio/issues/1942#issuecomment-4162607761>
|
||||
// In other words, we are assuming that there will be no spurious
|
||||
// wakeups while establishing the connection.
|
||||
|
||||
// The connection is established.
|
||||
|
||||
|
||||
@@ -25,6 +25,9 @@ fn main() {
|
||||
test_recv_nonblock();
|
||||
#[cfg(not(windows_hosts))]
|
||||
test_send_nonblock();
|
||||
test_shutdown_read_write();
|
||||
test_shutdown_read();
|
||||
test_shutdown_write();
|
||||
}
|
||||
|
||||
/// Test that connecting to a server socket works when the client
|
||||
@@ -340,3 +343,81 @@ fn test_send_nonblock() {
|
||||
|
||||
reader_thread.join().unwrap();
|
||||
}
|
||||
|
||||
/// Test that the EPOLLHUP and EPOLLRDHUP readiness are set when both
|
||||
/// the read and write ends of a socket are closed.
|
||||
fn test_shutdown_read_write() {
|
||||
let (server_sockfd, addr) = net::make_listener_ipv4().unwrap();
|
||||
let client_sockfd =
|
||||
unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() };
|
||||
let epfd = unsafe { libc::epoll_create1(0) };
|
||||
|
||||
// Spawn the server thread.
|
||||
let server_thread = thread::spawn(move || net::accept_ipv4(server_sockfd).unwrap());
|
||||
|
||||
net::connect_ipv4(client_sockfd, addr).unwrap();
|
||||
|
||||
epoll_ctl_add(epfd, client_sockfd, EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLIN).unwrap();
|
||||
|
||||
// Close the read and write end of the socket.
|
||||
unsafe { libc::shutdown(client_sockfd, libc::SHUT_RDWR) };
|
||||
|
||||
// Ensure that the "read end closed", "write end closed", and "readable" readiness are set.
|
||||
check_epoll_wait::<8>(
|
||||
epfd,
|
||||
&[Ev { events: EPOLLRDHUP | EPOLLHUP | EPOLLIN, data: client_sockfd }],
|
||||
-1,
|
||||
);
|
||||
|
||||
server_thread.join().unwrap();
|
||||
}
|
||||
|
||||
/// Test that the EPOLLRDHUP readiness is set when the read
|
||||
/// end of a socket is closed.
|
||||
fn test_shutdown_read() {
|
||||
let (server_sockfd, addr) = net::make_listener_ipv4().unwrap();
|
||||
let client_sockfd =
|
||||
unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() };
|
||||
let epfd = unsafe { libc::epoll_create1(0) };
|
||||
|
||||
// Spawn the server thread.
|
||||
let server_thread = thread::spawn(move || net::accept_ipv4(server_sockfd).unwrap());
|
||||
|
||||
net::connect_ipv4(client_sockfd, addr).unwrap();
|
||||
|
||||
epoll_ctl_add(epfd, client_sockfd, EPOLLET | EPOLLHUP | EPOLLRDHUP).unwrap();
|
||||
|
||||
// Close the read end of the socket.
|
||||
unsafe { libc::shutdown(client_sockfd, libc::SHUT_RD) };
|
||||
|
||||
// Ensure that the "read end closed" readiness is set.
|
||||
check_epoll_wait::<8>(epfd, &[Ev { events: EPOLLRDHUP, data: client_sockfd }], -1);
|
||||
|
||||
server_thread.join().unwrap();
|
||||
}
|
||||
|
||||
/// Test that the EPOLLRDHUP readiness is set when the write
|
||||
/// end of the peer socket is closed.
|
||||
fn test_shutdown_write() {
|
||||
let (server_sockfd, addr) = net::make_listener_ipv4().unwrap();
|
||||
let client_sockfd =
|
||||
unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() };
|
||||
let epfd = unsafe { libc::epoll_create1(0) };
|
||||
|
||||
// Spawn the server thread.
|
||||
let server_thread = thread::spawn(move || {
|
||||
let (peerfd, _) = net::accept_ipv4(server_sockfd).unwrap();
|
||||
// Close the write end of the peer socket.
|
||||
unsafe { libc::shutdown(peerfd, libc::SHUT_WR) };
|
||||
});
|
||||
|
||||
net::connect_ipv4(client_sockfd, addr).unwrap();
|
||||
|
||||
epoll_ctl_add(epfd, client_sockfd, EPOLLET | EPOLLHUP | EPOLLRDHUP).unwrap();
|
||||
|
||||
// Ensure that the "read end closed" readiness is set when
|
||||
// the write end of the peer is closed.
|
||||
check_epoll_wait::<8>(epfd, &[Ev { events: EPOLLRDHUP, data: client_sockfd }], -1);
|
||||
|
||||
server_thread.join().unwrap();
|
||||
}
|
||||
|
||||
@@ -553,9 +553,6 @@ fn test_shutdown() {
|
||||
assert_eq!(bytes_read, 0);
|
||||
}
|
||||
|
||||
// TODO: Once epoll is available for TCP sockets, ensure that the rdhup and hup readiness
|
||||
// are set.
|
||||
|
||||
// Closing should affect previously duplicated handles.
|
||||
unsafe {
|
||||
let err =
|
||||
@@ -599,7 +596,6 @@ fn test_shutdown_readable_after_write_close() {
|
||||
unsafe {
|
||||
// Close the write end.
|
||||
libc::shutdown(client_sockfd, libc::SHUT_WR);
|
||||
|
||||
// Ensure that we're still readable.
|
||||
let mut byte = [0u8];
|
||||
errno_result(libc::read(client_sockfd, byte.as_mut_ptr().cast(), 1)).unwrap();
|
||||
@@ -624,7 +620,6 @@ fn test_shutdown_writable_after_read_close() {
|
||||
unsafe {
|
||||
// Close the read end.
|
||||
libc::shutdown(client_sockfd, libc::SHUT_RD);
|
||||
|
||||
// Ensure that we're still writable.
|
||||
errno_result(libc::write(client_sockfd, [1u8].as_ptr().cast(), 1)).unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user