This commit is contained in:
Andrew Kelley
2025-11-21 08:04:52 -08:00
parent c91dec3b6f
commit c6e7c2cb80
3 changed files with 818 additions and 557 deletions
-5
View File
@@ -633,11 +633,6 @@ pub const VTable = struct {
result: []u8,
result_alignment: std.mem.Alignment,
) void,
/// Returns whether the current thread of execution is known to have
/// been requested to cancel.
///
/// Thread-safe.
cancelRequested: *const fn (?*anyopaque) bool,
/// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel.
+812 -239
View File
@@ -12,22 +12,16 @@ const Io = std.Io;
const net = std.Io.net;
const HostName = std.Io.net.HostName;
const IpAddress = std.Io.net.IpAddress;
const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
const assert = std.debug.assert;
const posix = std.posix;
/// Thread-safe.
allocator: Allocator,
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false,
threads: std.ArrayList(std.Thread),
stack_size: usize,
thread_capacity: std.atomic.Value(ThreadCapacity),
thread_capacity_error: ?std.Thread.CpuCountError,
concurrent_count: usize,
main_thread: Thread,
stack_size: usize = default_stack_size,
capacity: std.atomic.Value(Capacity),
capacity_error: ?std.Thread.CpuCountError,
concurrent_limit: Io.Limit = .unlimited,
pid: Pid = .unknown,
wsa: if (is_windows) Wsa else struct {} = .{},
@@ -35,22 +29,626 @@ have_signal_handler: bool,
old_sig_io: if (have_sig_io) posix.Sigaction else void,
old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void,
pub const ThreadCapacity = enum(usize) {
pub const Pid = enum(if (posix.pid_t == void) u0 else posix.pid_t) {
unknown = 0,
_,
};
pub fn init(n: usize) ThreadCapacity {
assert(n != 0);
pub const Thread = struct {
/// The value that needs to be passed to pthread_kill or tgkill in order to
/// send a signal.
signal_id: SignalId,
/// Points to the next thread in the list. Singly-linked so that
/// it can be updated lock-free.
list_node: std.SinglyLinkedList.Node = .{},
run_queue: std.SinglyLinkedList.Node = .{},
current_closure: ?*Closure = null,
completion: Completion,
mutex: std.Thread.Mutex,
cond: std.Thread.Condition,
join_requested: bool,
threadlocal var current: *Thread = undefined;
const SignalId = if (use_pthreads) std.c.pthread_t else std.Thread.Id;
const Completion = switch (native_os) {
.windows => @compileError("TODO"),
.linux => struct {
state: State = State.init(.running),
child_tid: std.atomic.Value(i32) = std.atomic.Value(i32).init(1),
parent_tid: i32 = undefined,
mapped: []align(std.heap.page_size_min) u8,
/// State to synchronize detachment of spawner thread to spawned thread
const State = std.atomic.Value(enum(switch (builtin.zig_backend) {
.stage2_riscv64 => u32,
else => u8,
}) {
running,
detached,
completed,
});
/// Calls `munmap(mapped.ptr, mapped.len)` then `exit(1)` without touching the stack (which lives in `mapped.ptr`).
/// Ported over from musl libc's pthread detached implementation:
/// https://github.com/ifduyue/musl/search?q=__unmapself
fn freeAndExit(self: *Completion) noreturn {
switch (builtin.target.cpu.arch) {
.x86 => asm volatile (
\\ movl $91, %%eax # SYS_munmap
\\ movl %[ptr], %%ebx
\\ movl %[len], %%ecx
\\ int $128
\\ movl $1, %%eax # SYS_exit
\\ movl $0, %%ebx
\\ int $128
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.x86_64 => asm volatile (switch (builtin.target.abi) {
.gnux32, .muslx32 =>
\\ movl $0x4000000b, %%eax # SYS_munmap
\\ syscall
\\ movl $0x4000003c, %%eax # SYS_exit
\\ xor %%rdi, %%rdi
\\ syscall
,
else =>
\\ movl $11, %%eax # SYS_munmap
\\ syscall
\\ movl $60, %%eax # SYS_exit
\\ xor %%rdi, %%rdi
\\ syscall
,
}
:
: [ptr] "{rdi}" (@intFromPtr(self.mapped.ptr)),
[len] "{rsi}" (self.mapped.len),
),
.arm, .armeb, .thumb, .thumbeb => asm volatile (
\\ mov r7, #91 // SYS_munmap
\\ mov r0, %[ptr]
\\ mov r1, %[len]
\\ svc 0
\\ mov r7, #1 // SYS_exit
\\ mov r0, #0
\\ svc 0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.aarch64, .aarch64_be => asm volatile (
\\ mov x8, #215 // SYS_munmap
\\ mov x0, %[ptr]
\\ mov x1, %[len]
\\ svc 0
\\ mov x8, #93 // SYS_exit
\\ mov x0, #0
\\ svc 0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.alpha => asm volatile (
\\ ldi $0, 73 # SYS_munmap
\\ mov %[ptr], $16
\\ mov %[len], $17
\\ callsys
\\ ldi $0, 1 # SYS_exit
\\ ldi $16, 0
\\ callsys
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.hexagon => asm volatile (
\\ r6 = #215 // SYS_munmap
\\ r0 = %[ptr]
\\ r1 = %[len]
\\ trap0(#1)
\\ r6 = #93 // SYS_exit
\\ r0 = #0
\\ trap0(#1)
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.hppa => asm volatile (
\\ ldi 91, %%r20 /* SYS_munmap */
\\ copy %[ptr], %%r26
\\ copy %[len], %%r25
\\ ble 0x100(%%sr2, %%r0)
\\ ldi 1, %%r20 /* SYS_exit */
\\ ldi 0, %%r26
\\ ble 0x100(%%sr2, %%r0)
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.m68k => asm volatile (
\\ move.l #91, %%d0 // SYS_munmap
\\ move.l %[ptr], %%d1
\\ move.l %[len], %%d2
\\ trap #0
\\ move.l #1, %%d0 // SYS_exit
\\ move.l #0, %%d1
\\ trap #0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.microblaze, .microblazeel => asm volatile (
\\ ori r12, r0, 91 # SYS_munmap
\\ ori r5, %[ptr], 0
\\ ori r6, %[len], 0
\\ brki r14, 0x8
\\ ori r12, r0, 1 # SYS_exit
\\ or r5, r0, r0
\\ brki r14, 0x8
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
// We set `sp` to the address of the current function as a workaround for a Linux
// kernel bug that caused syscalls to return EFAULT if the stack pointer is invalid.
// The bug was introduced in 46e12c07b3b9603c60fc1d421ff18618241cb081 and fixed in
// 7928eb0370d1133d0d8cd2f5ddfca19c309079d5.
.mips, .mipsel => asm volatile (
\\ move $sp, $t9
\\ li $v0, 4091 # SYS_munmap
\\ move $a0, %[ptr]
\\ move $a1, %[len]
\\ syscall
\\ li $v0, 4001 # SYS_exit
\\ li $a0, 0
\\ syscall
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.mips64, .mips64el => asm volatile (switch (builtin.target.abi) {
.gnuabin32, .muslabin32 =>
\\ li $v0, 6011 # SYS_munmap
\\ move $a0, %[ptr]
\\ move $a1, %[len]
\\ syscall
\\ li $v0, 6058 # SYS_exit
\\ li $a0, 0
\\ syscall
,
else =>
\\ li $v0, 5011 # SYS_munmap
\\ move $a0, %[ptr]
\\ move $a1, %[len]
\\ syscall
\\ li $v0, 5058 # SYS_exit
\\ li $a0, 0
\\ syscall
,
}
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.or1k => asm volatile (
\\ l.ori r11, r0, 215 # SYS_munmap
\\ l.ori r3, %[ptr]
\\ l.ori r4, %[len]
\\ l.sys 1
\\ l.ori r11, r0, 93 # SYS_exit
\\ l.ori r3, r0, r0
\\ l.sys 1
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.powerpc, .powerpcle, .powerpc64, .powerpc64le => asm volatile (
\\ li 0, 91 # SYS_munmap
\\ mr 3, %[ptr]
\\ mr 4, %[len]
\\ sc
\\ li 0, 1 # SYS_exit
\\ li 3, 0
\\ sc
\\ blr
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.riscv32, .riscv64 => asm volatile (
\\ li a7, 215 # SYS_munmap
\\ mv a0, %[ptr]
\\ mv a1, %[len]
\\ ecall
\\ li a7, 93 # SYS_exit
\\ mv a0, zero
\\ ecall
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.s390x => asm volatile (
\\ lgr %%r2, %[ptr]
\\ lgr %%r3, %[len]
\\ svc 91 # SYS_munmap
\\ lghi %%r2, 0
\\ svc 1 # SYS_exit
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.sh, .sheb => asm volatile (
\\ mov #91, r3 ! SYS_munmap
\\ mov %[ptr], r4
\\ mov %[len], r5
\\ trapa #31
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ mov #1, r3 ! SYS_exit
\\ mov #0, r4
\\ trapa #31
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.sparc => asm volatile (
\\ # See sparc64 comments below.
\\ 1:
\\ cmp %%fp, 0
\\ beq 2f
\\ nop
\\ ba 1b
\\ restore
\\ 2:
\\ mov 73, %%g1 // SYS_munmap
\\ mov %[ptr], %%o0
\\ mov %[len], %%o1
\\ t 0x3 # ST_FLUSH_WINDOWS
\\ t 0x10
\\ mov 1, %%g1 // SYS_exit
\\ mov 0, %%o0
\\ t 0x10
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.sparc64 => asm volatile (
\\ # SPARCs really don't like it when active stack frames
\\ # is unmapped (it will result in a segfault), so we
\\ # force-deactivate it by running `restore` until
\\ # all frames are cleared.
\\ 1:
\\ cmp %%fp, 0
\\ beq 2f
\\ nop
\\ ba 1b
\\ restore
\\ 2:
\\ mov 73, %%g1 // SYS_munmap
\\ mov %[ptr], %%o0
\\ mov %[len], %%o1
\\ # Flush register window contents to prevent background
\\ # memory access before unmapping the stack.
\\ flushw
\\ t 0x6d
\\ mov 1, %%g1 // SYS_exit
\\ mov 0, %%o0
\\ t 0x6d
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.loongarch32, .loongarch64 => asm volatile (
\\ or $a0, $zero, %[ptr]
\\ or $a1, $zero, %[len]
\\ ori $a7, $zero, 215 # SYS_munmap
\\ syscall 0 # call munmap
\\ ori $a0, $zero, 0
\\ ori $a7, $zero, 93 # SYS_exit
\\ syscall 0 # call exit
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
else => |cpu_arch| @compileError("Unsupported linux arch: " ++ @tagName(cpu_arch)),
}
unreachable;
}
},
else => void,
};
const AllocateError = error{OutOfMemory};
fn allocate(stack_size: usize) AllocateError!*Thread {
if (use_pthreads) {
@compileError("TODO");
} else if (is_windows) {
@compileError("TODO");
} else if (native_os == .linux) {
const linux = std.os.linux;
const page_size = std.heap.pageSize();
var guard_offset: usize = undefined;
var stack_offset: usize = undefined;
var tls_offset: usize = undefined;
var instance_offset: usize = undefined;
const map_bytes = blk: {
var bytes: usize = page_size;
guard_offset = bytes;
bytes += @max(page_size, stack_size);
bytes = std.mem.alignForward(usize, bytes, page_size);
stack_offset = bytes;
bytes = std.mem.alignForward(usize, bytes, linux.tls.area_desc.alignment);
tls_offset = bytes;
bytes += linux.tls.area_desc.size;
bytes = std.mem.alignForward(usize, bytes, @alignOf(Thread));
instance_offset = bytes;
bytes += @sizeOf(Thread);
bytes = std.mem.alignForward(usize, bytes, page_size);
break :blk bytes;
};
// Map all memory needed without read/write permissions to avoid
// committing the whole region right away. Anonymous mapping ensures
// file descriptor limits are not exceeded.
const mapped = posix.mmap(
null,
map_bytes,
posix.PROT.NONE,
.{ .TYPE = .PRIVATE, .ANONYMOUS = true },
-1,
0,
) catch |err| switch (err) {
error.MemoryMappingNotSupported => unreachable,
error.AccessDenied => unreachable,
error.PermissionDenied => unreachable,
error.ProcessFdQuotaExceeded => unreachable,
error.SystemFdQuotaExceeded => unreachable,
error.MappingAlreadyExists => unreachable,
else => |e| return e,
};
assert(mapped.len >= map_bytes);
errdefer posix.munmap(mapped);
// map everything but the guard page as read/write
posix.mprotect(
@alignCast(mapped[guard_offset..]),
posix.PROT.READ | posix.PROT.WRITE,
) catch |err| switch (err) {
error.AccessDenied => unreachable,
else => |e| return e,
};
// Prepare the TLS segment and prepare a user_desc struct when needed on x86
var tls_ptr = linux.tls.prepareArea(mapped[tls_offset..]);
var user_desc: if (builtin.target.cpu.arch == .x86) linux.user_desc else void = undefined;
if (builtin.target.cpu.arch == .x86) {
defer tls_ptr = @intFromPtr(&user_desc);
user_desc = .{
.entry_number = linux.tls.area_desc.gdt_entry_number,
.base_addr = tls_ptr,
.limit = 0xfffff,
.flags = .{
.seg_32bit = 1,
.contents = 0, // Data
.read_exec_only = 0,
.limit_in_pages = 1,
.seg_not_present = 0,
.useable = 1,
},
};
}
const instance: *Thread = @ptrCast(@alignCast(&mapped[instance_offset]));
instance.* = .{
.signal_id = undefined, // Initialized on spawn.
.completion = .{
.mapped = mapped,
.stack_offset = stack_offset,
},
};
return instance;
} else {
@compileError("unimplemented");
}
}
const SpawnError = error{
ThreadQuotaExceeded,
SystemResources,
Unexpected,
};
fn spawn(thread: *Thread) SpawnError!void {
if (use_pthreads) {
const c = std.c;
const stack_size = {}; // TODO
var attr: c.pthread_attr_t = undefined;
if (c.pthread_attr_init(&attr) != .SUCCESS) return error.SystemResources;
defer assert(c.pthread_attr_destroy(&attr) == .SUCCESS);
assert(c.pthread_attr_setstacksize(&attr, stack_size) == .SUCCESS);
assert(c.pthread_attr_setguardsize(&attr, std.heap.pageSize()) == .SUCCESS);
var handle: c.pthread_t = undefined;
switch (c.pthread_create(
&handle,
&attr,
posixStart,
@ptrCast(thread),
)) {
.SUCCESS => {
thread.signal_id = handle;
return;
},
.AGAIN => return error.SystemResources,
.PERM => unreachable,
.INVAL => unreachable,
else => |err| return posix.unexpectedErrno(err),
}
@compileError("TODO");
} else if (is_windows) {
@compileError("TODO");
} else if (native_os == .linux) {
const linux = std.os.linux;
const flags: u32 = linux.CLONE.THREAD | linux.CLONE.DETACHED |
linux.CLONE.VM | linux.CLONE.FS | linux.CLONE.FILES |
linux.CLONE.PARENT_SETTID | linux.CLONE.CHILD_CLEARTID |
linux.CLONE.SIGHAND | linux.CLONE.SYSVSEM | linux.CLONE.SETTLS;
switch (linux.errno(linux.clone(
linuxStart,
@intFromPtr(&thread.completion.mapped[thread.completion.stack_offset]),
flags,
@intFromPtr(thread),
&thread.parent_tid,
thread.completion.tls_ptr,
&thread.child_tid.raw,
))) {
.SUCCESS => return,
.AGAIN => return error.ThreadQuotaExceeded,
.INVAL => unreachable,
.NOMEM => return error.SystemResources,
.NOSPC => unreachable,
.PERM => unreachable,
.USERS => unreachable,
else => |err| return posix.unexpectedErrno(err),
}
} else {
@compileError("unimplemented");
}
}
fn linuxStart(raw_arg: usize) callconv(.c) u8 {
const t: *Thread = @ptrFromInt(raw_arg);
worker(t);
switch (t.completion.swap(.completed, .seq_cst)) {
.running => return 0,
.completed => unreachable,
.detached => t.completion.freeAndExit(),
}
unreachable;
}
fn posixStart(raw_arg: ?*anyopaque) callconv(.c) ?*anyopaque {
const t: *Thread = @ptrCast(@alignCast(raw_arg));
worker(t);
return null;
}
fn worker(t: *Thread) void {
current = t;
t.mutex.lock();
while (true) {
while (t.run_queue.popFirst()) |closure_node| {
t.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node);
closure.start(closure);
t.mutex.lock();
}
if (t.join_requested) break;
t.cond.wait(&t.mutex);
}
}
fn checkCancel(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
.requested,
.acknowledged,
.acq_rel,
.acquire,
) orelse return error.Canceled) {
.none => return,
.requested => unreachable,
.acknowledged => unreachable,
_ => return,
}
}
fn beginSyscall(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
.none,
thread.signal_id,
.acq_rel,
.acquire,
) orelse return) {
.none => unreachable,
.requested => {
@atomicStore(CancelStatus, &closure.cancel_status, .acknowledged, .acquire);
return error.Canceled;
},
.acknowledged => unreachable,
_ => unreachable,
}
}
fn endSyscall(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
thread.signal_id,
.none,
.acq_rel,
.release,
) orelse return) {
.none => unreachable,
.requested => {
@atomicStore(CancelStatus, &closure.cancel_status, .acknowledged, .release);
return error.Canceled;
},
.acknowledged => return,
_ => unreachable,
}
}
};
pub const Capacity = enum(isize) {
unknown = -30000,
_,
pub fn init(n: isize) Capacity {
assert(n > 0);
return @enumFromInt(n);
}
pub fn get(tc: ThreadCapacity) ?usize {
pub fn get(tc: Capacity) ?usize {
if (tc == .unknown) return null;
return @intFromEnum(tc);
}
};
threadlocal var current_closure: ?*Closure = null;
pub const default_stack_size = 16 * 1024 * 1024;
pub const use_pthreads = !is_windows and native_os != .wasi and builtin.link_libc;
const max_iovecs_len = 8;
const splat_buffer_size = 64;
@@ -59,85 +657,108 @@ comptime {
if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX);
}
const CancelId = enum(usize) {
const CancelStatus = enum(usize) {
/// Cancellation has neither been requested, nor checked. The async
/// operation will check status before entering a blocking syscall.
/// This is also the status used for uninteruptible tasks.
none = 0,
canceling = std.math.maxInt(usize),
/// Cancellation has been requested and the status will be checked before
/// entering a blocking syscall.
requested = std.math.maxInt(usize) - 1,
/// Cancellation has been acknowledged and is in progress. Signals should
/// not be sent.
acknowledged = std.math.maxInt(usize),
/// Stores a `Thread.SignalId` and indicates that sending a signal to this thread
/// is needed in order to cancel. This state is set before going into
/// a blocking operation that needs to get unblocked via signal.
_,
const ThreadId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id;
const Unpacked = union(enum) {
none,
requested,
acknowledeged,
signal_id: Thread.SignalId,
};
fn currentThread() CancelId {
if (std.Thread.use_pthreads) {
return @enumFromInt(@intFromPtr(std.c.pthread_self()));
} else {
return @enumFromInt(std.Thread.getCurrentId());
}
}
fn toThreadId(cancel_id: CancelId) ThreadId {
if (std.Thread.use_pthreads) {
return @ptrFromInt(@intFromEnum(cancel_id));
} else {
return @intCast(@intFromEnum(cancel_id));
}
fn unpack(cs: CancelStatus) Unpacked {
return switch (cs) {
.none => .none,
.requested => .requested,
.acknowledged => .acknowledged,
_ => |signal_id| .{ .signal_id = signal_id },
};
}
};
const Closure = struct {
start: Start,
node: std.SinglyLinkedList.Node = .{},
cancel_tid: CancelId,
cancel_status: CancelStatus,
/// Whether this task bumps minimum number of threads in the pool.
is_concurrent: bool,
const Start = *const fn (*Closure) void;
fn requestCancel(closure: *Closure) void {
switch (@atomicRmw(CancelId, &closure.cancel_tid, .Xchg, .canceling, .acq_rel)) {
.none, .canceling => {},
else => |tid| {
if (std.Thread.use_pthreads) {
const rc = std.c.pthread_kill(tid.toThreadId(), .IO);
if (is_debug) assert(rc == 0);
} else if (native_os == .linux) {
_ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid.toThreadId()), .IO);
}
},
fn requestCancel(closure: *Closure, t: *Threaded) void {
var signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) {
.none, .acknowledged, .requested => return,
else => |signal_id| signal_id,
};
// The task will enter a blocking syscall before checking for cancellation again.
// We can send a signal to interrupt the syscall, but if it arrives before
// the syscall instruction, it will be missed. Therefore, this code tries
// again until the cancellation request is acknowledged.
const max_attempts = 3;
for (0..max_attempts) |_| {
if (use_pthreads) {
const rc = std.c.pthread_kill(signal_id.toThreadId(), .IO);
if (is_debug) assert(rc == 0);
} else if (native_os == .linux) {
const pid: posix.pid_t = p: {
const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic);
if (cached_pid != .unknown) break :p @intFromEnum(cached_pid);
const pid = std.os.linux.getpid();
@atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic);
break :p pid;
};
_ = std.os.linux.tgkill(pid, @bitCast(signal_id.toThreadId()), .IO);
} else {
return;
}
// TODO make this a nanosleep with 1 << attempt duration
std.Thread.yield() catch {};
switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) {
.requested => continue,
.none, .acknowledged => return,
else => |new_signal_id| signal_id = new_signal_id,
}
}
}
};
pub const InitError = std.Thread.CpuCountError || Allocator.Error;
pub const CpuCountError = error{
PermissionDenied,
SystemResources,
Unsupported,
} || Io.UnexpectedError;
/// Related:
/// * `init_single_threaded`
pub fn init(
/// Must be threadsafe. Only used for the following functions:
/// * `Io.VTable.async`
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// If these functions are avoided, then `Allocator.failing` may be passed
/// here.
gpa: Allocator,
) Threaded {
pub fn init() Threaded {
const cpu_count = std.Thread.getCpuCount();
var t: Threaded = .{
.allocator = gpa,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.thread_capacity = .init(if (cpu_count) |n| .init(n) else |_| .unknown),
.thread_capacity_error = if (cpu_count) |_| null else |e| e,
.capacity = .init(if (cpu_count) |n| .init(n) else |_| .unknown),
.capacity_error = if (cpu_count) |_| null else |e| e,
.concurrent_count = 0,
.old_sig_io = undefined,
.old_sig_pipe = undefined,
.have_signal_handler = false,
};
if (cpu_count) |n| {
t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {}
if (posix.Sigaction != void) {
// This causes sending `posix.SIG.IO` to thread to interrupt blocking
// syscalls, returning `posix.E.INTR`.
@@ -161,11 +782,9 @@ pub fn init(
/// * cancel requests have no effect.
/// * `deinit` is safe, but unnecessary to call.
pub const init_single_threaded: Threaded = .{
.allocator = .failing,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.thread_capacity = .init(.init(1)),
.thread_capacity_error = null,
.capacity = .init(.init(1)),
.capacity_error = null,
.concurrent_count = 0,
.old_sig_io = undefined,
.old_sig_pipe = undefined,
@@ -173,9 +792,7 @@ pub const init_single_threaded: Threaded = .{
};
pub fn deinit(t: *Threaded) void {
const gpa = t.allocator;
t.join();
t.threads.deinit(gpa);
join(t);
if (is_windows and t.wsa.status == .initialized) {
if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected();
}
@@ -186,46 +803,29 @@ pub fn deinit(t: *Threaded) void {
t.* = undefined;
}
pub fn setThreadCapacity(t: *Threaded, n: usize) void {
t.thread_capacity.store(.init(n), .monotonic);
pub fn setCapacity(t: *Threaded, n: usize) void {
t.capacity.store(.init(n), .monotonic);
}
pub fn getThreadCapacity(t: *Threaded) ?usize {
return t.thread_capacity.load(.monotonic).get();
}
pub fn getCurrentThreadId() usize {
@panic("TODO");
pub fn getCapacity(t: *Threaded) ?usize {
return t.capacity.load(.monotonic).get();
}
fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
t.mutex.lock();
defer t.mutex.unlock();
t.join_requested = true;
}
t.cond.broadcast();
for (t.threads.items) |thread| thread.join();
}
fn worker(t: *Threaded) void {
t.mutex.lock();
defer t.mutex.unlock();
while (true) {
while (t.run_queue.popFirst()) |closure_node| {
t.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node);
const is_concurrent = closure.is_concurrent;
closure.start(closure);
t.mutex.lock();
if (is_concurrent) {
t.concurrent_count -= 1;
var it: ?*const std.SinglyLinkedList.Node = &t.main_thread.list_node;
while (it) |n| : (it = n.next) {
const thread: *Thread = @fieldParentPtr("list_node", n);
{
thread.mutex.lock();
defer thread.mutex.unlock();
thread.join_requested = true;
thread.cond.signal();
}
thread.join();
}
if (t.join_requested) break;
t.cond.wait(&t.mutex);
}
}
@@ -237,7 +837,6 @@ pub fn io(t: *Threaded) Io {
.concurrent = concurrent,
.await = await,
.cancel = cancel,
.cancelRequested = cancelRequested,
.select = select,
.groupAsync = groupAsync,
@@ -333,7 +932,6 @@ pub fn ioBasic(t: *Threaded) Io {
.concurrent = concurrent,
.await = await,
.cancel = cancel,
.cancelRequested = cancelRequested,
.select = select,
.groupAsync = groupAsync,
@@ -428,22 +1026,10 @@ const AsyncClosure = struct {
fn start(closure: *Closure) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid: CancelId = .currentThread();
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
// Even though we already know the task is canceled, we must still
// run the closure in order to make the return value valid and in
// case there are side effects.
}
current_closure = closure;
const current_thread = Thread.current;
current_thread.current_closure = closure;
ac.func(ac.contextPointer(), ac.resultPointer());
current_closure = null;
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
}
current_thread.current_closure = null;
if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
assert(select_reset != done_reset_event);
@@ -464,14 +1050,14 @@ const AsyncClosure = struct {
}
fn init(
gpa: Allocator,
ac: *AsyncClosure,
mode: enum { async, concurrent },
result_len: usize,
result_alignment: Alignment,
context: []const u8,
context_alignment: Alignment,
func: *const fn (context: *const anyopaque, result: *anyopaque) void,
) Allocator.Error!*AsyncClosure {
) void {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure);
const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment);
const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len);
@@ -529,7 +1115,7 @@ fn async(
}
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.getThreadCapacity() orelse {
const may_spawn = takeCapacity(t) catch {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
@@ -538,42 +1124,25 @@ fn async(
const gpa = t.allocator;
const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch {
returnCapacity(t);
start(context.ptr, result.ptr);
return null;
};
t.mutex.lock();
@memcpy(ac.contextPointer()[0..context.len], context);
const thread_capacity = cpu_count - 1 + t.concurrent_count;
if (may_spawn) {
// TODO Allocate Thread
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
ac.deinit(gpa);
start(context.ptr, result.ptr);
return null;
};
thread.run_queue.prepend(&ac.closure.node);
t.run_queue.prepend(&ac.closure.node);
// TODO start thread
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
if (t.threads.items.len == 0) {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
ac.deinit(gpa);
start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
t.mutex.unlock();
t.cond.signal();
return @ptrCast(ac);
};
t.threads.appendAssumeCapacity(thread);
return @ptrCast(ac);
}
t.mutex.unlock();
t.cond.signal();
const thread = Thread.current;
thread.run_queue.prepend(&ac.closure.node);
return @ptrCast(ac);
}
@@ -588,7 +1157,7 @@ fn concurrent(
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.getThreadCapacity() orelse 1;
const cpu_count = t.getCapacity() orelse 1;
const gpa = t.allocator;
const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch {
@@ -598,9 +1167,9 @@ fn concurrent(
t.mutex.lock();
t.concurrent_count += 1;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
const capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
t.threads.ensureTotalCapacity(gpa, capacity) catch {
t.mutex.unlock();
ac.deinit(gpa);
return error.ConcurrencyUnavailable;
@@ -608,7 +1177,7 @@ fn concurrent(
t.run_queue.prepend(&ac.closure.node);
if (t.threads.items.len < thread_capacity) {
if (t.threads.items.len < capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
@@ -635,24 +1204,13 @@ const GroupClosure = struct {
fn start(closure: *Closure) void {
const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid: CancelId = .currentThread();
const current_thread = Thread.current;
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
// Even though we already know the task is canceled, we must still
// run the closure in case there are side effects.
}
current_closure = closure;
current_thread.current_closure = closure;
gc.func(group, gc.contextPointer());
current_closure = null;
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
}
current_thread.current_closure = null;
const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel);
assert((prev_state / sync_one_pending) > 0);
@@ -717,7 +1275,7 @@ fn groupAsync(
if (builtin.single_threaded) return start(group, context.ptr);
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.getThreadCapacity() orelse 1;
const cpu_count = t.getCapacity() orelse 1;
const gpa = t.allocator;
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch {
@@ -730,9 +1288,9 @@ fn groupAsync(
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
const capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.threads.ensureTotalCapacityPrecise(gpa, capacity) catch {
t.mutex.unlock();
gc.deinit(gpa);
return start(group, context.ptr);
@@ -740,7 +1298,7 @@ fn groupAsync(
t.run_queue.prepend(&gc.closure.node);
if (t.threads.items.len < thread_capacity) {
if (t.threads.items.len < capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &gc.closure.node);
t.mutex.unlock();
@@ -775,7 +1333,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
gc.closure.requestCancel();
gc.closure.requestCancel(t);
node = node.next orelse break;
}
reset_event.waitUncancelable();
@@ -801,7 +1359,7 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
gc.closure.requestCancel();
gc.closure.requestCancel(t);
node = node.next orelse break;
}
}
@@ -844,21 +1402,10 @@ fn cancel(
_ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
ac.closure.requestCancel();
ac.closure.requestCancel(t);
ac.waitAndDeinit(t.allocator, result);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
const closure = current_closure orelse return false;
return @atomicLoad(CancelId, &closure.cancel_tid, .acquire) == .canceling;
}
fn checkCancel(t: *Threaded) error{Canceled}!void {
if (cancelRequested(t)) return error.Canceled;
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
if (builtin.single_threaded) unreachable; // Interface should have prevented this.
if (native_os == .netbsd) @panic("TODO");
@@ -1043,35 +1590,47 @@ const dirMake = switch (native_os) {
fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
const current_thread = Thread.current;
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
try current_thread.beginSyscall();
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) {
.SUCCESS => return,
.INTR => continue,
.CANCELED => return error.Canceled,
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.PERM => return error.PermissionDenied,
.DQUOT => return error.DiskQuota,
.EXIST => return error.PathAlreadyExists,
.FAULT => |err| return errnoBug(err),
.LOOP => return error.SymLinkLoop,
.MLINK => return error.LinkQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
// dragonfly: when dir_fd is unlinked from filesystem
.NOTCONN => return error.FileNotFound,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
.SUCCESS => {
try current_thread.endSyscall();
break;
},
.INTR => {
try current_thread.checkCancel();
continue;
},
else => |e| {
try current_thread.endSyscall();
switch (e) {
.CANCELED => return error.Canceled,
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.PERM => return error.PermissionDenied,
.DQUOT => return error.DiskQuota,
.EXIST => return error.PathAlreadyExists,
.FAULT => |err| return errnoBug(err),
.LOOP => return error.SymLinkLoop,
.MLINK => return error.LinkQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
// dragonfly: when dir_fd is unlinked from filesystem
.NOTCONN => return error.FileNotFound,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
},
}
}
}
@@ -1981,6 +2540,7 @@ fn dirOpenFilePosix(
flags: Io.File.OpenFlags,
) Io.File.OpenError!Io.File {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const current_thread = Thread.current;
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
@@ -2017,40 +2577,52 @@ fn dirOpenFilePosix(
},
};
const fd: posix.fd_t = while (true) {
try t.checkCancel();
try current_thread.beginSyscall();
const fd = while (true) {
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0));
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
.CANCELED => return error.Canceled,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.SRCH => return error.ProcessNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
.SUCCESS => {
const fd: posix.fd_t = @intCast(rc);
errdefer posix.close(fd);
try current_thread.endSyscall();
break fd;
},
.INTR => {
try current_thread.checkCancel();
continue;
},
else => |e| {
try current_thread.endSyscall();
switch (e) {
.CANCELED => return error.Canceled,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.SRCH => return error.ProcessNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
},
}
};
errdefer posix.close(fd);
@@ -6208,6 +6780,7 @@ fn initializeWsa(t: *Threaded) error{NetworkDown}!void {
fn doNothingSignalHandler(_: posix.SIG) callconv(.c) void {}
test {
_ = @import("Threaded/test.zig");
}
+6 -313
View File
@@ -1,6 +1,6 @@
//! This struct represents a kernel thread, and acts as a namespace for concurrency
//! primitives that operate on kernel threads. For concurrency primitives that support
//! both evented I/O and async I/O, see the respective names in the top level std namespace.
//! This struct represents a kernel thread, and acts as a namespace for
//! concurrency primitives that operate on kernel threads. For concurrency
//! primitives that interact with the I/O interface, see `std.Io`.
const std = @import("std.zig");
const builtin = @import("builtin");
@@ -20,7 +20,7 @@ pub const RwLock = @import("Thread/RwLock.zig");
pub const Pool = @import("Thread/Pool.zig");
pub const WaitGroup = @import("Thread/WaitGroup.zig");
pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc;
pub const use_pthreads = std.Io.Threaded.use_pthreads;
/// A thread-safe logical boolean value which can be `set` and `unset`.
///
@@ -422,12 +422,7 @@ pub fn getCurrentId() Id {
return Impl.getCurrentId();
}
pub const CpuCountError = error{
PermissionDenied,
SystemResources,
Unsupported,
Unexpected,
};
pub const CpuCountError = std.Io.Threaded.CpuCountError;
/// Returns the platforms view on the number of logical CPU cores available.
///
@@ -446,7 +441,7 @@ pub const SpawnConfig = struct {
/// The allocator to be used to allocate memory for the to-be-spawned thread
allocator: ?std.mem.Allocator = null,
pub const default_stack_size = 16 * 1024 * 1024;
pub const default_stack_size = std.Io.Threaded.default_stack_size;
};
pub const SpawnError = error{
@@ -1215,308 +1210,6 @@ const LinuxThreadImpl = struct {
thread: *ThreadCompletion,
const ThreadCompletion = struct {
completion: Completion = Completion.init(.running),
child_tid: std.atomic.Value(i32) = std.atomic.Value(i32).init(1),
parent_tid: i32 = undefined,
mapped: []align(std.heap.page_size_min) u8,
/// Calls `munmap(mapped.ptr, mapped.len)` then `exit(1)` without touching the stack (which lives in `mapped.ptr`).
/// Ported over from musl libc's pthread detached implementation:
/// https://github.com/ifduyue/musl/search?q=__unmapself
fn freeAndExit(self: *ThreadCompletion) noreturn {
switch (target.cpu.arch) {
.x86 => asm volatile (
\\ movl $91, %%eax # SYS_munmap
\\ movl %[ptr], %%ebx
\\ movl %[len], %%ecx
\\ int $128
\\ movl $1, %%eax # SYS_exit
\\ movl $0, %%ebx
\\ int $128
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.x86_64 => asm volatile (switch (target.abi) {
.gnux32, .muslx32 =>
\\ movl $0x4000000b, %%eax # SYS_munmap
\\ syscall
\\ movl $0x4000003c, %%eax # SYS_exit
\\ xor %%rdi, %%rdi
\\ syscall
,
else =>
\\ movl $11, %%eax # SYS_munmap
\\ syscall
\\ movl $60, %%eax # SYS_exit
\\ xor %%rdi, %%rdi
\\ syscall
,
}
:
: [ptr] "{rdi}" (@intFromPtr(self.mapped.ptr)),
[len] "{rsi}" (self.mapped.len),
),
.arm, .armeb, .thumb, .thumbeb => asm volatile (
\\ mov r7, #91 // SYS_munmap
\\ mov r0, %[ptr]
\\ mov r1, %[len]
\\ svc 0
\\ mov r7, #1 // SYS_exit
\\ mov r0, #0
\\ svc 0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.aarch64, .aarch64_be => asm volatile (
\\ mov x8, #215 // SYS_munmap
\\ mov x0, %[ptr]
\\ mov x1, %[len]
\\ svc 0
\\ mov x8, #93 // SYS_exit
\\ mov x0, #0
\\ svc 0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.alpha => asm volatile (
\\ ldi $0, 73 # SYS_munmap
\\ mov %[ptr], $16
\\ mov %[len], $17
\\ callsys
\\ ldi $0, 1 # SYS_exit
\\ ldi $16, 0
\\ callsys
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.hexagon => asm volatile (
\\ r6 = #215 // SYS_munmap
\\ r0 = %[ptr]
\\ r1 = %[len]
\\ trap0(#1)
\\ r6 = #93 // SYS_exit
\\ r0 = #0
\\ trap0(#1)
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.hppa => asm volatile (
\\ ldi 91, %%r20 /* SYS_munmap */
\\ copy %[ptr], %%r26
\\ copy %[len], %%r25
\\ ble 0x100(%%sr2, %%r0)
\\ ldi 1, %%r20 /* SYS_exit */
\\ ldi 0, %%r26
\\ ble 0x100(%%sr2, %%r0)
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.m68k => asm volatile (
\\ move.l #91, %%d0 // SYS_munmap
\\ move.l %[ptr], %%d1
\\ move.l %[len], %%d2
\\ trap #0
\\ move.l #1, %%d0 // SYS_exit
\\ move.l #0, %%d1
\\ trap #0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.microblaze, .microblazeel => asm volatile (
\\ ori r12, r0, 91 # SYS_munmap
\\ ori r5, %[ptr], 0
\\ ori r6, %[len], 0
\\ brki r14, 0x8
\\ ori r12, r0, 1 # SYS_exit
\\ or r5, r0, r0
\\ brki r14, 0x8
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
// We set `sp` to the address of the current function as a workaround for a Linux
// kernel bug that caused syscalls to return EFAULT if the stack pointer is invalid.
// The bug was introduced in 46e12c07b3b9603c60fc1d421ff18618241cb081 and fixed in
// 7928eb0370d1133d0d8cd2f5ddfca19c309079d5.
.mips, .mipsel => asm volatile (
\\ move $sp, $t9
\\ li $v0, 4091 # SYS_munmap
\\ move $a0, %[ptr]
\\ move $a1, %[len]
\\ syscall
\\ li $v0, 4001 # SYS_exit
\\ li $a0, 0
\\ syscall
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.mips64, .mips64el => asm volatile (switch (target.abi) {
.gnuabin32, .muslabin32 =>
\\ li $v0, 6011 # SYS_munmap
\\ move $a0, %[ptr]
\\ move $a1, %[len]
\\ syscall
\\ li $v0, 6058 # SYS_exit
\\ li $a0, 0
\\ syscall
,
else =>
\\ li $v0, 5011 # SYS_munmap
\\ move $a0, %[ptr]
\\ move $a1, %[len]
\\ syscall
\\ li $v0, 5058 # SYS_exit
\\ li $a0, 0
\\ syscall
,
}
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.or1k => asm volatile (
\\ l.ori r11, r0, 215 # SYS_munmap
\\ l.ori r3, %[ptr]
\\ l.ori r4, %[len]
\\ l.sys 1
\\ l.ori r11, r0, 93 # SYS_exit
\\ l.ori r3, r0, r0
\\ l.sys 1
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.powerpc, .powerpcle, .powerpc64, .powerpc64le => asm volatile (
\\ li 0, 91 # SYS_munmap
\\ mr 3, %[ptr]
\\ mr 4, %[len]
\\ sc
\\ li 0, 1 # SYS_exit
\\ li 3, 0
\\ sc
\\ blr
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.riscv32, .riscv64 => asm volatile (
\\ li a7, 215 # SYS_munmap
\\ mv a0, %[ptr]
\\ mv a1, %[len]
\\ ecall
\\ li a7, 93 # SYS_exit
\\ mv a0, zero
\\ ecall
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.s390x => asm volatile (
\\ lgr %%r2, %[ptr]
\\ lgr %%r3, %[len]
\\ svc 91 # SYS_munmap
\\ lghi %%r2, 0
\\ svc 1 # SYS_exit
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.sh, .sheb => asm volatile (
\\ mov #91, r3 ! SYS_munmap
\\ mov %[ptr], r4
\\ mov %[len], r5
\\ trapa #31
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ mov #1, r3 ! SYS_exit
\\ mov #0, r4
\\ trapa #31
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
\\ or r0, r0
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.sparc => asm volatile (
\\ # See sparc64 comments below.
\\ 1:
\\ cmp %%fp, 0
\\ beq 2f
\\ nop
\\ ba 1b
\\ restore
\\ 2:
\\ mov 73, %%g1 // SYS_munmap
\\ mov %[ptr], %%o0
\\ mov %[len], %%o1
\\ t 0x3 # ST_FLUSH_WINDOWS
\\ t 0x10
\\ mov 1, %%g1 // SYS_exit
\\ mov 0, %%o0
\\ t 0x10
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.sparc64 => asm volatile (
\\ # SPARCs really don't like it when active stack frames
\\ # is unmapped (it will result in a segfault), so we
\\ # force-deactivate it by running `restore` until
\\ # all frames are cleared.
\\ 1:
\\ cmp %%fp, 0
\\ beq 2f
\\ nop
\\ ba 1b
\\ restore
\\ 2:
\\ mov 73, %%g1 // SYS_munmap
\\ mov %[ptr], %%o0
\\ mov %[len], %%o1
\\ # Flush register window contents to prevent background
\\ # memory access before unmapping the stack.
\\ flushw
\\ t 0x6d
\\ mov 1, %%g1 // SYS_exit
\\ mov 0, %%o0
\\ t 0x6d
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
.loongarch32, .loongarch64 => asm volatile (
\\ or $a0, $zero, %[ptr]
\\ or $a1, $zero, %[len]
\\ ori $a7, $zero, 215 # SYS_munmap
\\ syscall 0 # call munmap
\\ ori $a0, $zero, 0
\\ ori $a7, $zero, 93 # SYS_exit
\\ syscall 0 # call exit
:
: [ptr] "r" (@intFromPtr(self.mapped.ptr)),
[len] "r" (self.mapped.len),
: .{ .memory = true }),
else => |cpu_arch| @compileError("Unsupported linux arch: " ++ @tagName(cpu_arch)),
}
unreachable;
}
};
fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl {
const page_size = std.heap.pageSize();
const Args = @TypeOf(args);