Merge pull request #5012 from RalfJung/libc-helpers

rename and adjust some libc test utils for better consistency with std
This commit is contained in:
Ralf Jung
2026-05-09 12:32:32 +00:00
committed by GitHub
14 changed files with 172 additions and 271 deletions
@@ -68,14 +68,12 @@ fn main() {
let thread1 = spawn(move || {
unsafe { VAL_ONE = 41 };
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds_a[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
let data = "abcde".as_bytes();
libc_utils::write_all(fds_a[0], data).unwrap();
unsafe { VAL_TWO = 51 };
let res = unsafe { libc_utils::write_all(fds_b[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
libc_utils::write_all(fds_b[0], data).unwrap();
});
thread::yield_now();
@@ -1,7 +1,8 @@
//! We test that if we requested to read 4 bytes, but actually read 3 bytes,
//! then 3 bytes (not 4) will be initialized.
//@ignore-target: windows # no file system support on Windows
//@compile-flags: -Zmiri-disable-isolation
// Short FD ops can affect the exact error message here
//@compile-flags: -Zmiri-disable-isolation -Zmiri-no-short-fd-operations
use std::ffi::CString;
use std::fs::remove_file;
@@ -21,9 +22,9 @@ fn main() {
let fd = libc::open(cpath.as_ptr(), libc::O_RDONLY);
assert_ne!(fd, -1);
let mut buf: MaybeUninit<[u8; 4]> = std::mem::MaybeUninit::uninit();
// Read as much as we can from a 3-byte file.
let res = libc_utils::read_all(fd, buf.as_mut_ptr().cast::<std::ffi::c_void>(), 4);
assert!(res == 3);
// Do a 4-byte read; this can actually read at most 3 bytes.
let res = libc::read(fd, buf.as_mut_ptr().cast::<std::ffi::c_void>(), 4);
assert!(res <= 3);
buf.assume_init(); //~ERROR: encountered uninitialized memory, but expected an integer
assert_eq!(libc::close(fd), 0);
}
@@ -79,7 +79,7 @@ fn main() {
thread::yield_now();
// Create two events at once.
libc_utils::write_all_from_slice(fd1, &0_u64.to_ne_bytes()).unwrap();
libc_utils::write_all(fd1, &0_u64.to_ne_bytes()).unwrap();
thread1.join().unwrap();
thread2.join().unwrap();
@@ -23,9 +23,7 @@ fn main() {
assert_eq!(res, 0);
let arr1: [u8; 212992] = [1; 212992];
// Exhaust the space in the buffer so the subsequent write will block.
let res =
unsafe { libc_utils::write_all(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) };
assert_eq!(res, 212992);
libc_utils::write_all(fds[0], &arr1).unwrap();
let thread1 = thread::spawn(move || {
let data = "a".as_bytes();
// The write below will be blocked because the buffer is already full.
@@ -24,16 +24,15 @@ fn main() {
thread::yield_now();
let mut buffer = [22u8; 128];
let bytes_written = unsafe {
errno_result(libc_utils::write_all_generic(
unsafe {
libc_utils::write_all_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::NoRetry,
|buf, len| libc::send(peerfd, buf, len, 0),
))
)
.unwrap()
};
assert_eq!(bytes_written as usize, 128);
});
net::connect_ipv4(client_sockfd, addr).unwrap();
@@ -41,12 +40,12 @@ fn main() {
let reader_thread = thread::spawn(move || {
let mut buffer = [0u8; 8];
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::NoRetry,
|buf, count| libc::recv(client_sockfd, buf, count, 0),
))
)
.unwrap()
};
assert_eq!(&buffer, &[22u8; 8]);
@@ -54,12 +53,12 @@ fn main() {
let mut buffer = [0u8; 8];
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::NoRetry,
|buf, count| libc::recv(client_sockfd, buf, count, 0),
))
)
.unwrap()
};
assert_eq!(&buffer, &[22u8; 8]);
@@ -58,7 +58,7 @@ fn test_epoll_block_then_unblock() {
// epoll_wait before triggering notification so it will block then get unblocked before timeout.
let thread1 = thread::spawn(move || {
thread::yield_now();
write_all_from_slice(fds[1], b"abcde").unwrap();
write_all(fds[1], b"abcde").unwrap();
});
check_epoll_wait::<1>(epfd, &[Ev { events: libc::EPOLLIN | libc::EPOLLOUT, data: fds[0] }], 10);
thread1.join().unwrap();
@@ -83,7 +83,7 @@ fn test_notification_after_timeout() {
check_epoll_wait::<1>(epfd, &[], 10);
// Trigger epoll notification after timeout.
write_all_from_slice(fds[1], b"abcde").unwrap();
write_all(fds[1], b"abcde").unwrap();
// Check the result of the notification.
check_epoll_wait::<1>(epfd, &[Ev { events: libc::EPOLLIN | libc::EPOLLOUT, data: fds[0] }], 10);
@@ -106,7 +106,7 @@ fn test_epoll_race() {
// Write to the static mut variable.
unsafe { VAL = 1 };
// Write to the eventfd instance.
write_all_from_slice(fd, &1_u64.to_ne_bytes()).unwrap();
write_all(fd, &1_u64.to_ne_bytes()).unwrap();
});
thread::yield_now();
// epoll_wait for the event to happen.
@@ -130,7 +130,7 @@ fn wakeup_on_new_interest() {
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Write to fd[0]
write_all_from_slice(fds[0], b"abcde").unwrap();
write_all(fds[0], b"abcde").unwrap();
// Block a thread on the epoll instance.
let t = std::thread::spawn(move || {
@@ -189,7 +189,7 @@ fn multiple_events_wake_multiple_threads() {
thread::yield_now();
// Trigger the eventfd. This triggers two events at once!
write_all_from_slice(fd1, &0_u64.to_ne_bytes()).unwrap();
write_all(fd1, &0_u64.to_ne_bytes()).unwrap();
// Both threads should have been woken up so that both events can be consumed.
let e1 = t1.join().unwrap();
@@ -56,10 +56,10 @@ fn test_epoll_socketpair() {
let mut fds = [-1, -1];
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Write to fd[0]
write_all_from_slice(fds[0], b"abcde").unwrap();
// Write to fds[0]
write_all(fds[0], b"abcde").unwrap();
// Register fd[1] with EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP
// Register fds[1] with EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP
epoll_ctl_add(epfd, fds[1], EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP).unwrap();
// Check result from epoll_wait.
@@ -68,10 +68,10 @@ fn test_epoll_socketpair() {
// Check that this is indeed using "ET" (edge-trigger) semantics: a second epoll should return nothing.
check_epoll_wait_noblock::<8>(epfd, &[]);
// Write some more to fd[0].
write_all_from_slice(fds[0], b"abcde").unwrap();
// Write some more to fds[0].
write_all(fds[0], b"abcde").unwrap();
// This did not change the readiness of fd[1], so we should get no event.
// This did not change the readiness of fds[1], so we should get no event.
// However, Linux seems to always deliver spurious events to the peer on each write,
// so we match that.
check_epoll_wait_noblock::<8>(epfd, &[Ev { data: fds[1], events: EPOLLIN | EPOLLOUT }]);
@@ -98,7 +98,7 @@ fn test_epoll_ctl_mod() {
let mut fds = [-1, -1];
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Register fd[1] with EPOLLIN|EPOLLET, and data of "0".
// Register fds[1] with EPOLLIN|EPOLLET, and data of "0".
epoll_ctl(epfd, EPOLL_CTL_ADD, fds[1], Ev { events: EPOLLIN | EPOLLET, data: 0 }).unwrap();
// Check result from epoll_wait. No notification would be returned.
@@ -112,8 +112,8 @@ fn test_epoll_ctl_mod() {
// Write to fds[1] and read from fds[0] to make the notification ready again
// (relying on there always being an event when the buffer gets emptied).
write_all_from_slice(fds[1], "abc".as_bytes()).unwrap();
read_all_into_array::<3>(fds[0]).unwrap();
write_all(fds[1], "abc".as_bytes()).unwrap();
read_exact_array::<3>(fds[0]).unwrap();
// Now that the event is already ready, change the "data" value.
epoll_ctl(epfd, EPOLL_CTL_MOD, fds[1], Ev { events: EPOLLOUT | EPOLLET, data: 2 }).unwrap();
@@ -136,12 +136,10 @@ fn test_epoll_ctl_del() {
let mut fds = [-1, -1];
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Write to fd[0]
let data = b"abcde".as_ptr();
let res = unsafe { libc_utils::write_all(fds[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
// Write to fds[0]
libc_utils::write_all(fds[0], b"abcde").unwrap();
// Register fd[1] with EPOLLIN|EPOLLOUT|EPOLLET
// Register fds[1] with EPOLLIN|EPOLLOUT|EPOLLET
let mut ev = libc::epoll_event {
events: (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as u32,
u64: u64::try_from(fds[1]).unwrap(),
@@ -168,9 +166,7 @@ fn test_two_epoll_instance() {
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Write to the socketpair.
let data = b"abcde".as_ptr();
let res = unsafe { libc_utils::write_all(fds[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
libc_utils::write_all(fds[0], b"abcde").unwrap();
// Register one side of the socketpair with EPOLLIN | EPOLLOUT | EPOLLET.
epoll_ctl_add(epfd1, fds[1], libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET).unwrap();
@@ -208,9 +204,7 @@ fn test_two_same_fd_in_same_epoll_instance() {
assert_eq!(res, 0);
// Write to the socketpair.
let data = b"abcde".as_ptr();
let res = unsafe { libc_utils::write_all(fds[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
libc_utils::write_all(fds[0], b"abcde").unwrap();
// Two notification should be received.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
@@ -227,7 +221,7 @@ fn test_epoll_eventfd() {
let fd = errno_result(unsafe { libc::eventfd(0, flags) }).unwrap();
// Write 1 to the eventfd instance.
libc_utils::write_all_from_slice(fd, &1_u64.to_ne_bytes()).unwrap();
libc_utils::write_all(fd, &1_u64.to_ne_bytes()).unwrap();
// Create an epoll instance.
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();
@@ -241,14 +235,14 @@ fn test_epoll_eventfd() {
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)]);
// Write 0 to the eventfd.
libc_utils::write_all_from_slice(fd, &0_u64.to_ne_bytes()).unwrap();
libc_utils::write_all(fd, &0_u64.to_ne_bytes()).unwrap();
// This does not change the status, so we should get no event.
// However, Linux performs a spurious wakeup.
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)]);
// Read from the eventfd.
libc_utils::read_all_into_array::<8>(fd).unwrap();
libc_utils::read_exact_array::<8>(fd).unwrap();
// This consumes the event, so the read status is gone. However, deactivation
// does not trigger an event.
@@ -257,9 +251,7 @@ fn test_epoll_eventfd() {
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)]);
// Write the maximum possible value.
let sized_8_data: [u8; 8] = (u64::MAX - 1).to_ne_bytes();
let res = unsafe { libc_utils::write_all(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
assert_eq!(res, 8);
libc_utils::write_all(fd, &(u64::MAX - 1).to_ne_bytes()).unwrap();
// This reactivates reads, therefore triggering an event. Writing is no longer possible.
let expected_event = u32::try_from(libc::EPOLLIN).unwrap();
@@ -282,9 +274,7 @@ fn test_epoll_socketpair_both_sides() {
// Write to fds[1].
// (We do the write after the register here, unlike in `test_epoll_socketpair`, to ensure
// we cover both orders in which this could be done.)
let data = b"abcde".as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
libc_utils::write_all(fds[1], b"abcde").unwrap();
// Two notification should be received.
let expected_event0 = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
@@ -297,10 +287,7 @@ fn test_epoll_socketpair_both_sides() {
);
// Read from fds[0].
let mut buf: [u8; 5] = [0; 5];
let res =
unsafe { libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
assert_eq!(res, 5);
let buf = libc_utils::read_exact_array::<5>(fds[0]).unwrap();
assert_eq!(buf, *b"abcde");
// The state of fds[1] does not change (was writable, is writable).
@@ -323,9 +310,7 @@ fn test_closed_fd() {
epoll_ctl_add(epfd, fd, libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET).unwrap();
// Write to the eventfd instance.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res = unsafe { libc_utils::write_all(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
assert_eq!(res, 8);
libc_utils::write_all(fd, &1_u64.to_ne_bytes()).unwrap();
// Close the eventfd.
errno_check(unsafe { libc::close(fd) });
@@ -363,10 +348,7 @@ fn test_not_fully_closed_fd() {
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)]);
// Write to the eventfd instance to produce notification.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res =
unsafe { libc_utils::write_all(newfd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
assert_eq!(res, 8);
libc_utils::write_all(newfd, &1_u64.to_ne_bytes()).unwrap();
// Close the dupped fd.
errno_check(unsafe { libc::close(newfd) });
@@ -383,9 +365,7 @@ fn test_event_overwrite() {
errno_result(unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) }).unwrap();
// Write to the eventfd instance.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res = unsafe { libc_utils::write_all(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
assert_eq!(res, 8);
libc_utils::write_all(fd, &1_u64.to_ne_bytes()).unwrap();
// Create an epoll instance.
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();
@@ -435,9 +415,7 @@ fn test_socketpair_read() {
// Write a bunch of data bytes to fds[1].
let data = [42u8; 1024];
let res =
unsafe { libc_utils::write_all(fds[1], data.as_ptr() as *const libc::c_void, data.len()) };
assert_eq!(res, data.len() as isize);
libc_utils::write_all(fds[1], &data).unwrap();
// Two notification should be received.
let expected_event0 = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
@@ -451,9 +429,7 @@ fn test_socketpair_read() {
// Read some of the data from fds[0].
let mut buf = [0; 512];
let res =
unsafe { libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
assert_eq!(res, buf.len() as isize);
libc_utils::read_exact(fds[0], &mut buf).unwrap();
// fds[1] did not change, it is still writable, so we get no event.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
@@ -462,9 +438,7 @@ fn test_socketpair_read() {
// Read until the buffer is empty.
let rest = data.len() - buf.len();
let res =
unsafe { libc_utils::read_all(fds[0], buf.as_mut_ptr().cast(), rest as libc::size_t) };
assert_eq!(res, rest as isize);
libc_utils::read_exact(fds[0], &mut buf[..rest]).unwrap();
// Now we get a notification that fds[1] can be written. This is spurious since it
// could already be written before, but Linux seems to always emit a notification for
@@ -481,7 +455,7 @@ fn test_no_notification_for_unregister_flag() {
let mut fds = [-1, -1];
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Register fd[0] with EPOLLOUT|EPOLLET.
// Register fds[0] with EPOLLOUT|EPOLLET.
let mut ev = libc::epoll_event {
events: (libc::EPOLLOUT | libc::EPOLLET).cast_unsigned(),
u64: u64::try_from(fds[0]).unwrap(),
@@ -489,12 +463,8 @@ fn test_no_notification_for_unregister_flag() {
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) };
assert_eq!(res, 0);
// Write to fd[1].
let data = b"abcde".as_ptr();
let res: i32 = unsafe {
libc_utils::write_all(fds[1], data as *const libc::c_void, 5).try_into().unwrap()
};
assert_eq!(res, 5);
// Write to fds[1].
libc_utils::write_all(fds[1], b"abcde").unwrap();
// Check result from epoll_wait. Since we didn't register EPOLLIN flag, the notification won't
// contain EPOLLIN even though fds[0] is now readable.
@@ -523,16 +493,14 @@ fn test_socketpair_epollerr() {
let mut fds = [-1, -1];
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
// Write to fd[0]
let data = b"abcde".as_ptr();
let res = unsafe { libc_utils::write_all(fds[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
// Write to fds[0]
libc_utils::write_all(fds[0], b"abcde").unwrap();
// Close fds[1].
// EPOLLERR will be triggered if we close peer fd that still has data in its read buffer.
errno_check(unsafe { libc::close(fds[1]) });
// Register fd[1] with EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP
// Register fds[1] with EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP
epoll_ctl_add(epfd, fds[0], libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET | libc::EPOLLRDHUP)
.unwrap();
@@ -671,9 +639,7 @@ fn test_issue_3858() {
errno_check(unsafe { libc::close(epfd) });
// Write to the eventfd instance.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res = unsafe { libc_utils::write_all(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
assert_eq!(res, 8);
libc_utils::write_all(fd, &1_u64.to_ne_bytes()).unwrap();
}
/// Ensure that if a socket becomes un-writable, we don't see it any more.
@@ -694,10 +660,9 @@ fn test_issue_4374() {
// Fill up fds[0] so that it is not writable any more.
let zeros = [0u8; 512];
loop {
let res = unsafe {
libc_utils::write_all(fds[0], zeros.as_ptr() as *const libc::c_void, zeros.len())
};
if res < 0 {
let res = libc_utils::write_all(fds[0], &zeros);
if let Err(err) = res {
assert_eq!(err.kind(), std::io::ErrorKind::WouldBlock);
break;
}
}
@@ -719,19 +684,13 @@ fn test_issue_4374_reads() {
assert_eq!(unsafe { libc::fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK) }, 0);
// Write to fds[1] so that fds[0] becomes readable.
let data = b"abcde".as_ptr();
let res: i32 = unsafe {
libc_utils::write_all(fds[1], data as *const libc::c_void, 5).try_into().unwrap()
};
assert_eq!(res, 5);
libc_utils::write_all(fds[1], b"abcde").unwrap();
// Register fds[0] with epoll while it is readable.
epoll_ctl_add(epfd0, fds[0], libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET).unwrap();
// Read fds[0] so it is no longer readable.
let mut buf = [0u8; 512];
let res = unsafe { libc_utils::read_all(fds[0], buf.as_mut_ptr() as *mut libc::c_void, 5) };
assert_eq!(res, 5);
libc_utils::read_exact_array::<5>(fds[0]).unwrap();
// We should now still see a notification, but only about it being writable.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
@@ -221,8 +221,8 @@ fn test_dup_stdout_stderr() {
unsafe {
let new_stdout = libc::fcntl(1, libc::F_DUPFD, 0);
let new_stderr = libc::fcntl(2, libc::F_DUPFD, 0);
libc_utils::write_all(new_stdout, bytes.as_ptr() as *const libc::c_void, bytes.len());
libc_utils::write_all(new_stderr, bytes.as_ptr() as *const libc::c_void, bytes.len());
libc_utils::write_all(new_stdout, bytes).unwrap();
libc_utils::write_all(new_stderr, bytes).unwrap();
}
}
+14 -14
View File
@@ -31,19 +31,19 @@ fn test_pipe() {
// Read size == data available in buffer.
let data = b"12345";
write_all_from_slice(fds[1], data).unwrap();
let buf3 = read_all_into_array::<5>(fds[0]).unwrap();
write_all(fds[1], data).unwrap();
let buf3 = read_exact_array::<5>(fds[0]).unwrap();
assert_eq!(&buf3, data);
// Read size > data available in buffer.
let data = b"123";
write_all_from_slice(fds[1], data).unwrap();
write_all(fds[1], data).unwrap();
let mut buf4: [u8; 5] = [0; 5];
let (part1, rest) = read_into_slice(fds[0], &mut buf4).unwrap();
let (part1, rest) = read_split_slice(fds[0], &mut buf4).unwrap();
assert_eq!(part1[..], data[..part1.len()]);
// Write 2 more bytes so we can exactly fill the `rest`.
write_all_from_slice(fds[1], b"34").unwrap();
read_all_into_slice(fds[0], rest).unwrap();
write_all(fds[1], b"34").unwrap();
read_exact(fds[0], rest).unwrap();
}
fn test_pipe_threaded() {
@@ -51,19 +51,19 @@ fn test_pipe_threaded() {
errno_check(unsafe { libc::pipe(fds.as_mut_ptr()) });
let thread1 = thread::spawn(move || {
let buf = read_all_into_array::<5>(fds[0]).unwrap();
let buf = read_exact_array::<5>(fds[0]).unwrap();
assert_eq!(&buf, b"abcde");
});
thread::yield_now();
write_all_from_slice(fds[1], b"abcde").unwrap();
write_all(fds[1], b"abcde").unwrap();
thread1.join().unwrap();
// Read and write from different direction
let thread2 = thread::spawn(move || {
thread::yield_now();
write_all_from_slice(fds[1], b"12345").unwrap();
write_all(fds[1], b"12345").unwrap();
});
let buf = read_all_into_array::<5>(fds[0]).unwrap();
let buf = read_exact_array::<5>(fds[0]).unwrap();
assert_eq!(&buf, b"12345");
thread2.join().unwrap();
}
@@ -77,13 +77,13 @@ fn test_race() {
let thread1 = thread::spawn(move || {
// write() from the main thread will occur before the read() here
// because preemption is disabled and the main thread yields after write().
let buf = read_all_into_array::<1>(fds[0]).unwrap();
let buf = read_exact_array::<1>(fds[0]).unwrap();
assert_eq!(&buf, b"a");
// The read above establishes a happens-before so it is now safe to access this global variable.
unsafe { assert_eq!(VAL, 1) };
});
unsafe { VAL = 1 };
write_all_from_slice(fds[1], b"a").unwrap();
write_all(fds[1], b"a").unwrap();
thread::yield_now();
thread1.join().unwrap();
}
@@ -190,10 +190,10 @@ fn test_pipe_fcntl_threaded() {
// The write below will unblock the `read` in main thread: even though
// the socket is now "non-blocking", the shim needs to deal correctly
// with threads that were blocked before the socket was made non-blocking.
write_all_from_slice(fds[1], b"abcde").unwrap();
write_all(fds[1], b"abcde").unwrap();
});
// The `read` below will block.
let buf = read_all_into_array::<5>(fds[0]).unwrap();
let buf = read_exact_array::<5>(fds[0]).unwrap();
thread1.join().unwrap();
assert_eq!(&buf, b"abcde");
}
@@ -183,14 +183,7 @@ fn test_recv_nonblock() {
// Yield back to client so that it starts receiving before we start sending.
thread::sleep(Duration::from_millis(10));
unsafe {
errno_result(libc_utils::write_all(
peerfd,
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
))
.unwrap()
};
libc_utils::write_all(peerfd, TEST_BYTES).unwrap();
});
net::connect_ipv4(client_sockfd, addr).unwrap();
@@ -324,12 +317,12 @@ fn test_send_nonblock() {
let mut buffer = Vec::with_capacity(total_written / 2);
buffer.fill(0u8);
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
total_written / 2,
libc_utils::NoRetry,
|buf, count| libc::recv(peerfd, buf, count, 0),
))
)
.unwrap()
};
});
@@ -267,12 +267,12 @@ fn test_send_recv_nonblock() {
thread::sleep(Duration::from_millis(10));
unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
libc_utils::NoRetry,
|buf, count| libc::send(peerfd, buf, count, 0),
))
)
.unwrap()
};
@@ -280,12 +280,12 @@ fn test_send_recv_nonblock() {
// This will block until the client sent us this data.
let mut buffer = [0; TEST_BYTES.len()];
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::NoRetry,
|buf, count| libc::recv(peerfd, buf, count, 0),
))
)
.unwrap()
};
assert_eq!(&buffer, TEST_BYTES);
@@ -314,12 +314,12 @@ fn test_send_recv_nonblock() {
// sleep multiple times until we received everything.
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::RetryAfter(Duration::from_millis(10)),
|buf, count| libc::recv(client_sockfd, buf, count, 0),
))
)
.unwrap()
};
assert_eq!(&buffer, TEST_BYTES);
@@ -328,12 +328,12 @@ fn test_send_recv_nonblock() {
// Sending into the empty buffer should succeed without blocking.
unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
libc_utils::NoRetry,
|buf, count| libc::send(client_sockfd, buf, count, 0),
))
)
.unwrap()
};
@@ -345,12 +345,12 @@ fn test_send_recv_nonblock() {
let fill_buf = [1u8; 5_000_000];
// This fills the socket receive buffer and thus should start blocking.
let err = unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
fill_buf.as_ptr().cast(),
fill_buf.len(),
libc_utils::NoRetry,
|buf, count| libc::send(client_sockfd, buf, count, 0),
))
)
.unwrap_err()
};
assert_eq!(err.kind(), ErrorKind::WouldBlock)
@@ -383,12 +383,12 @@ fn test_send_recv_dontwait() {
thread::sleep(Duration::from_millis(10));
unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
libc_utils::NoRetry,
|buf, count| libc::send(peerfd, buf, count, 0),
))
)
.unwrap()
};
@@ -396,12 +396,12 @@ fn test_send_recv_dontwait() {
// This will block until the client sent us this data.
let mut buffer = [0; TEST_BYTES.len()];
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::NoRetry,
|buf, count| libc::recv(peerfd, buf, count, 0),
))
)
.unwrap()
};
assert_eq!(&buffer, TEST_BYTES);
@@ -430,12 +430,12 @@ fn test_send_recv_dontwait() {
// sleep multiple times until we received everything.
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::RetryAfter(Duration::from_millis(10)),
|buf, count| libc::recv(client_sockfd, buf, count, libc::MSG_DONTWAIT),
))
)
.unwrap()
};
assert_eq!(&buffer, TEST_BYTES);
@@ -444,12 +444,12 @@ fn test_send_recv_dontwait() {
// Sending into the empty buffer should succeed without blocking.
unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
libc_utils::NoRetry,
|buf, count| libc::send(client_sockfd, buf, count, libc::MSG_DONTWAIT),
))
)
.unwrap()
};
@@ -461,12 +461,12 @@ fn test_send_recv_dontwait() {
let fill_buf = [1u8; 5_000_000];
// This fills the socket receive buffer and thus should start blocking.
let err = unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
fill_buf.as_ptr().cast(),
fill_buf.len(),
libc_utils::NoRetry,
|buf, count| libc::send(client_sockfd, buf, count, libc::MSG_DONTWAIT),
))
)
.unwrap_err()
};
assert_eq!(err.kind(), ErrorKind::WouldBlock)
@@ -489,23 +489,12 @@ fn test_write_read_nonblock() {
// Yield back to client so that it starts receiving before we start sending.
thread::sleep(Duration::from_millis(10));
let bytes_written = unsafe {
errno_result(libc_utils::write_all(
peerfd,
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
))
.unwrap()
};
assert_eq!(bytes_written as usize, TEST_BYTES.len());
libc_utils::write_all(peerfd, TEST_BYTES).unwrap();
// The buffer should contain `TEST_BYTES` at the beginning.
// This will block until the client sent us this data.
let mut buffer = [0; TEST_BYTES.len()];
unsafe {
errno_result(libc_utils::read_all(peerfd, buffer.as_mut_ptr().cast(), buffer.len()))
.unwrap()
};
libc_utils::read_exact(peerfd, &mut buffer).unwrap();
assert_eq!(&buffer, TEST_BYTES);
});
@@ -536,12 +525,12 @@ fn test_write_read_nonblock() {
// sleep multiple times until we read everything.
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::RetryAfter(Duration::from_millis(10)),
|buf, count| libc::read(client_sockfd, buf, count),
))
)
.unwrap()
};
assert_eq!(&buffer, TEST_BYTES);
@@ -549,15 +538,7 @@ fn test_write_read_nonblock() {
// Now we test non-blocking writing.
// Writing into the empty buffer should succeed without blocking.
let bytes_written = unsafe {
errno_result(libc_utils::write_all(
client_sockfd,
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
))
.unwrap()
};
assert_eq!(bytes_written as usize, TEST_BYTES.len());
libc_utils::write_all(client_sockfd, TEST_BYTES).unwrap();
if !cfg!(windows_host) {
// Keep sending data until the buffer is full and we block.
@@ -567,12 +548,12 @@ fn test_write_read_nonblock() {
let fill_buf = [1u8; 5_000_000];
// This fills the socket receive buffer and thus should start blocking.
let err = unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
fill_buf.as_ptr().cast(),
fill_buf.len(),
libc_utils::NoRetry,
|buf, count| libc::write(client_sockfd, buf, count),
))
)
.unwrap_err()
};
assert_eq!(err.kind(), ErrorKind::WouldBlock)
@@ -271,12 +271,12 @@ fn test_send_peek_recv() {
// Write the bytes into the stream.
unsafe {
errno_result(libc_utils::write_all_generic(
libc_utils::write_all_generic(
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
libc_utils::NoRetry,
|buf, count| libc::send(peerfd, buf, count, 0),
))
)
.unwrap()
};
});
@@ -303,12 +303,12 @@ fn test_send_peek_recv() {
let mut buffer = [0; TEST_BYTES.len()];
unsafe {
errno_result(libc_utils::read_all_generic(
libc_utils::read_exact_generic(
buffer.as_mut_ptr().cast(),
buffer.len(),
libc_utils::NoRetry,
|buf, count| libc::recv(client_sockfd, buf, count, 0),
))
)
.unwrap()
};
assert_eq!(&buffer, TEST_BYTES);
@@ -330,24 +330,13 @@ fn test_write_read() {
let (peerfd, _) = net::accept_ipv4(server_sockfd).unwrap();
// Write some bytes into the stream.
let bytes_written = unsafe {
errno_result(libc_utils::write_all(
peerfd,
TEST_BYTES.as_ptr().cast(),
TEST_BYTES.len(),
))
.unwrap()
};
assert_eq!(bytes_written as usize, TEST_BYTES.len());
libc_utils::write_all(peerfd, TEST_BYTES).unwrap();
});
net::connect_ipv4(client_sockfd, addr).unwrap();
let mut buffer = [0; TEST_BYTES.len()];
unsafe {
errno_result(libc_utils::read_all(client_sockfd, buffer.as_mut_ptr().cast(), buffer.len()))
.unwrap()
};
libc_utils::read_exact(client_sockfd, &mut buffer).unwrap();
assert_eq!(&buffer, TEST_BYTES);
server_thread.join().unwrap();
@@ -26,46 +26,50 @@ fn test_socketpair() {
// Read size == data available in buffer.
let data = b"abcde";
write_all_from_slice(fds[0], data).unwrap();
let buf = read_all_into_array::<5>(fds[1]).unwrap();
write_all(fds[0], data).unwrap();
let buf = read_exact_array::<5>(fds[1]).unwrap();
assert_eq!(&buf, data);
// Read size > data available in buffer.
let data = b"abc";
write_all_from_slice(fds[0], data).unwrap();
write_all(fds[0], data).unwrap();
let mut buf2: [u8; 5] = [0; 5];
let (read, rest) = read_into_slice(fds[1], &mut buf2).unwrap();
let (read, rest) = read_split_slice(fds[1], &mut buf2).unwrap();
assert_eq!(read[..], data[..read.len()]);
// Write 2 more bytes so we can exactly fill the `rest`.
write_all_from_slice(fds[0], b"12").unwrap();
read_all_into_slice(fds[1], rest).unwrap();
write_all(fds[0], b"12").unwrap();
read_exact(fds[1], rest).unwrap();
assert_eq!(&buf2, b"abc12");
// Test read and write from another direction.
// Read size == data available in buffer.
let data = b"12345";
write_all_from_slice(fds[1], data).unwrap();
let buf3 = read_all_into_array::<5>(fds[0]).unwrap();
write_all(fds[1], data).unwrap();
let buf3 = read_exact_array::<5>(fds[0]).unwrap();
assert_eq!(&buf3, data);
// Read size > data available in buffer.
let data = b"123";
write_all_from_slice(fds[1], data).unwrap();
let data = b"abc";
write_all(fds[1], data).unwrap();
let mut buf4: [u8; 5] = [0; 5];
let (read, rest) = read_into_slice(fds[0], &mut buf4).unwrap();
let (read, rest) = read_split_slice(fds[0], &mut buf4).unwrap();
assert_eq!(read[..], data[..read.len()]);
// Write 2 more bytes so we can exactly fill the `rest`.
write_all_from_slice(fds[1], b"12").unwrap();
read_all_into_slice(fds[0], rest).unwrap();
write_all(fds[1], b"12").unwrap();
read_exact(fds[0], rest).unwrap();
assert_eq!(&buf4, b"abc12");
// Test when happens when we close one end, with some data in the buffer.
write_all_from_slice(fds[0], data).unwrap();
write_all(fds[0], data).unwrap();
errno_check(unsafe { libc::close(fds[0]) });
// Reading the other end should return that data, then EOF.
let mut buf: [u8; 5] = [0; 5];
let (res, _) = read_until_eof_into_slice(fds[1], &mut buf).unwrap();
assert_eq!(res, data);
let (read, _tail) = read_split_slice(fds[1], &mut buf).unwrap();
assert_eq!(read, data);
let (read, _tail) = read_split_slice(fds[1], &mut buf).unwrap();
assert_eq!(read, &[]);
// Writing the other end should emit EPIPE.
let err = write_all_from_slice(fds[1], &mut buf).unwrap_err();
let err = write_all(fds[1], &mut buf).unwrap_err();
assert_eq!(err.raw_os_error(), Some(libc::EPIPE));
}
@@ -74,19 +78,19 @@ fn test_socketpair_threaded() {
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
let thread1 = thread::spawn(move || {
let buf = read_all_into_array::<5>(fds[1]).unwrap();
let buf = read_exact_array::<5>(fds[1]).unwrap();
assert_eq!(&buf, b"abcde");
});
thread::yield_now();
write_all_from_slice(fds[0], b"abcde").unwrap();
write_all(fds[0], b"abcde").unwrap();
thread1.join().unwrap();
// Read and write from different direction
let thread2 = thread::spawn(move || {
thread::yield_now();
write_all_from_slice(fds[1], b"12345").unwrap();
write_all(fds[1], b"12345").unwrap();
});
let buf = read_all_into_array::<5>(fds[0]).unwrap();
let buf = read_exact_array::<5>(fds[0]).unwrap();
assert_eq!(&buf, b"12345");
thread2.join().unwrap();
}
@@ -98,13 +102,13 @@ fn test_race() {
let thread1 = thread::spawn(move || {
// write() from the main thread will occur before the read() here
// because preemption is disabled and the main thread yields after write().
let buf = read_all_into_array::<1>(fds[1]).unwrap();
let buf = read_exact_array::<1>(fds[1]).unwrap();
assert_eq!(&buf, b"a");
// The read above establishes a happens-before so it is now safe to access this global variable.
unsafe { assert_eq!(VAL, 1) };
});
unsafe { VAL = 1 };
write_all_from_slice(fds[0], b"a").unwrap();
write_all(fds[0], b"a").unwrap();
thread::yield_now();
thread1.join().unwrap();
}
@@ -115,12 +119,12 @@ fn test_blocking_read() {
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
let thread1 = thread::spawn(move || {
// Let this thread block on read.
let buf = read_all_into_array::<3>(fds[1]).unwrap();
let buf = read_exact_array::<3>(fds[1]).unwrap();
assert_eq!(&buf, b"abc");
});
let thread2 = thread::spawn(move || {
// Unblock thread1 by doing writing something.
write_all_from_slice(fds[0], b"abc").unwrap();
write_all(fds[0], b"abc").unwrap();
});
thread1.join().unwrap();
thread2.join().unwrap();
@@ -132,14 +136,14 @@ fn test_blocking_write() {
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });
let arr1: [u8; 0x34000] = [1; 0x34000];
// Exhaust the space in the buffer so the subsequent write will block.
write_all_from_slice(fds[0], &arr1).unwrap();
write_all(fds[0], &arr1).unwrap();
let thread1 = thread::spawn(move || {
// The write below will be blocked because the buffer is already full.
write_all_from_slice(fds[0], b"abc").unwrap();
write_all(fds[0], b"abc").unwrap();
});
let thread2 = thread::spawn(move || {
// Unblock thread1 by freeing up some space.
let buf = read_all_into_array::<3>(fds[1]).unwrap();
let buf = read_exact_array::<3>(fds[1]).unwrap();
assert_eq!(buf, [1, 1, 1]);
});
thread1.join().unwrap();
+28 -49
View File
@@ -32,12 +32,14 @@ pub fn errno_check<T: From<i8> + Ord + fmt::Debug>(ret: T) {
}
/// Invoke the `read` function until `buf` is full. `retry` contols the behavior on EAGAIN.
pub unsafe fn read_all_generic(
/// Panics if we get EOF before the buffer is filled.
#[track_caller]
pub unsafe fn read_exact_generic(
buf: *mut libc::c_void,
count: libc::size_t,
retry: Retry,
read: impl Fn(*mut libc::c_void, libc::size_t) -> libc::ssize_t,
) -> libc::ssize_t {
) -> io::Result<()> {
assert!(count > 0);
let mut read_so_far = 0;
while read_so_far < count {
@@ -50,68 +52,52 @@ pub unsafe fn read_all_generic(
continue;
}
}
return res;
return Err(io::Error::last_os_error());
}
if res == 0 {
// EOF
break;
// We expected more data but got EOF.
panic!(
"could not fill buffer with {count} bytes: EOF received after {read_so_far} bytes"
);
}
read_so_far += res as libc::size_t;
}
return read_so_far as libc::ssize_t;
}
/// Read from `fd` until `buf` is full. Abort on first error.
pub unsafe fn read_all(
fd: libc::c_int,
buf: *mut libc::c_void,
count: libc::size_t,
) -> libc::ssize_t {
read_all_generic(buf, count, NoRetry, |buf, count| libc::read(fd, buf, count))
Ok(())
}
/// Try to fill the given slice by reading from `fd`. Panic if that many bytes could not be read.
#[track_caller]
pub fn read_all_into_slice(fd: libc::c_int, buf: &mut [u8]) -> io::Result<()> {
let res = errno_result(unsafe { read_all(fd, buf.as_mut_ptr().cast(), buf.len()) })?;
assert_eq!(res as usize, buf.len());
Ok(())
pub fn read_exact(fd: libc::c_int, buf: &mut [u8]) -> io::Result<()> {
unsafe {
read_exact_generic(buf.as_mut_ptr().cast(), buf.len(), NoRetry, |buf, count| {
libc::read(fd, buf, count)
})
}
}
/// Read exactly `N` bytes from `fd`. Error if that many bytes could not be read.
#[track_caller]
pub fn read_all_into_array<const N: usize>(fd: libc::c_int) -> io::Result<[u8; N]> {
pub fn read_exact_array<const N: usize>(fd: libc::c_int) -> io::Result<[u8; N]> {
let mut buf = [0; N];
read_all_into_slice(fd, &mut buf)?;
read_exact(fd, &mut buf)?;
Ok(buf)
}
/// Do a single read from `fd` and return the part of the buffer that was written into,
/// and the rest.
#[track_caller]
pub fn read_into_slice(fd: libc::c_int, buf: &mut [u8]) -> io::Result<(&mut [u8], &mut [u8])> {
pub fn read_split_slice(fd: libc::c_int, buf: &mut [u8]) -> io::Result<(&mut [u8], &mut [u8])> {
let res = errno_result(unsafe { libc::read(fd, buf.as_mut_ptr().cast(), buf.len()) })?;
Ok(buf.split_at_mut(res as usize))
}
/// Read from `fd` until we get EOF and return the part of the buffer that was written into,
/// and the rest.
#[track_caller]
pub fn read_until_eof_into_slice(
fd: libc::c_int,
buf: &mut [u8],
) -> io::Result<(&mut [u8], &mut [u8])> {
let res = errno_result(unsafe { read_all(fd, buf.as_mut_ptr().cast(), buf.len()) })?;
Ok(buf.split_at_mut(res as usize))
}
/// Invoke the `write` function until `buf` is full. `retry` controls the behavior on EAGAIN.
pub unsafe fn write_all_generic(
buf: *const libc::c_void,
count: libc::size_t,
retry: Retry,
write: impl Fn(*const libc::c_void, libc::size_t) -> libc::ssize_t,
) -> libc::ssize_t {
) -> io::Result<()> {
assert!(count > 0);
let mut written_so_far = 0;
while written_so_far < count {
@@ -124,29 +110,22 @@ pub unsafe fn write_all_generic(
continue;
}
}
return res;
return Err(io::Error::last_os_error());
}
// Apparently a return value of 0 is just a short write, nothing special (unlike reads).
written_so_far += res as libc::size_t;
}
return written_so_far as libc::ssize_t;
}
/// Write to `fd` until `buf` is fully written. Abort on first error.
pub unsafe fn write_all(
fd: libc::c_int,
buf: *const libc::c_void,
count: libc::size_t,
) -> libc::ssize_t {
write_all_generic(buf, count, NoRetry, |buf, count| libc::write(fd, buf, count))
return Ok(());
}
/// Write the entire `buf` to `fd`. Panic if not all bytes could be written.
#[track_caller]
pub fn write_all_from_slice(fd: libc::c_int, buf: &[u8]) -> io::Result<()> {
let res = errno_result(unsafe { write_all(fd, buf.as_ptr().cast(), buf.len()) })?;
assert_eq!(res as usize, buf.len());
Ok(())
pub fn write_all(fd: libc::c_int, buf: &[u8]) -> io::Result<()> {
unsafe {
write_all_generic(buf.as_ptr().cast(), buf.len(), NoRetry, |buf, count| {
libc::write(fd, buf, count)
})
}
}
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]