Io.Dispatch: introduce grand central dispatch io impl

This commit is contained in:
Jacob Young
2026-02-09 14:19:27 -05:00
parent 469bf6af07
commit 2fa1a78491
14 changed files with 6163 additions and 945 deletions
+13 -31
View File
@@ -34,11 +34,11 @@ watch_paths: std.StringArrayHashMapUnmanaged([]const *std.Build.Step),
/// The semaphore we use to block the thread calling `wait` until the callback determines a relevant
/// event has occurred. This is retained across `wait` calls for simplicity and efficiency.
waiting_semaphore: dispatch_semaphore_t,
waiting_semaphore: dispatch.semaphore_t,
/// This dispatch queue is created by us and executes serially. It exists exclusively to trigger the
/// callbacks of the FSEventStream we create. This is not in use outside of `wait`, but is retained
/// across `wait` calls for simplicity and efficiency.
dispatch_queue: dispatch_queue_t,
dispatch_queue: dispatch.queue_t,
/// In theory, this field avoids race conditions. In practice, it is essentially unused at the time
/// of writing. See the comment at the start of `wait` for details.
since_event: FSEventStreamEventId,
@@ -57,7 +57,7 @@ const ResolvedSymbols = struct {
latency: CFTimeInterval,
flags: FSEventStreamCreateFlags,
) callconv(.c) FSEventStreamRef,
FSEventStreamSetDispatchQueue: *const fn (stream: FSEventStreamRef, queue: dispatch_queue_t) callconv(.c) void,
FSEventStreamSetDispatchQueue: *const fn (stream: FSEventStreamRef, queue: dispatch.queue_t) callconv(.c) void,
FSEventStreamStart: *const fn (stream: FSEventStreamRef) callconv(.c) bool,
FSEventStreamStop: *const fn (stream: FSEventStreamRef) callconv(.c) void,
FSEventStreamInvalidate: *const fn (stream: FSEventStreamRef) callconv(.c) void,
@@ -80,7 +80,7 @@ const ResolvedSymbols = struct {
kCFAllocatorUseContext: *const CFAllocatorRef,
};
pub fn init(cwd_path: []const u8) error{ OpenFrameworkFailed, MissingCoreServicesSymbol }!FsEvents {
pub fn init(cwd_path: []const u8) error{ OpenFrameworkFailed, MissingCoreServicesSymbol, SystemResources }!FsEvents {
var core_services = std.DynLib.open("/System/Library/Frameworks/CoreServices.framework/CoreServices") catch
return error.OpenFrameworkFailed;
errdefer core_services.close();
@@ -96,8 +96,8 @@ pub fn init(cwd_path: []const u8) error{ OpenFrameworkFailed, MissingCoreService
.paths_arena = .{},
.watch_roots = &.{},
.watch_paths = .empty,
.waiting_semaphore = dispatch_semaphore_create(0),
.dispatch_queue = dispatch_queue_create("zig-watch", .SERIAL),
.waiting_semaphore = dispatch.semaphore_create(0) orelse return error.SystemResources,
.dispatch_queue = dispatch.queue_create("zig-watch", .SERIAL()) orelse return error.SystemResources,
// Not `.since_now`, because this means we can init `FsEvents` *before* we do work in order
// to notice any changes which happened during said work.
.since_event = resolved_symbols.FSEventsGetCurrentEventId(),
@@ -106,8 +106,8 @@ pub fn init(cwd_path: []const u8) error{ OpenFrameworkFailed, MissingCoreService
}
pub fn deinit(fse: *FsEvents, gpa: Allocator, io: Io) void {
dispatch_release(fse.waiting_semaphore);
dispatch_release(fse.dispatch_queue);
fse.waiting_semaphore.as_object().release();
fse.dispatch_queue.as_object().release();
fse.core_services.close(io);
gpa.free(fse.watch_roots);
@@ -275,9 +275,9 @@ pub fn wait(fse: *FsEvents, gpa: Allocator, timeout_ns: ?u64) error{ OutOfMemory
defer rs.FSEventStreamInvalidate(event_stream);
if (!rs.FSEventStreamStart(event_stream)) return error.StartFailed;
defer rs.FSEventStreamStop(event_stream);
const result = dispatch_semaphore_wait(fse.waiting_semaphore, timeout: {
const ns = timeout_ns orelse break :timeout .forever;
break :timeout dispatch_time(.now, @intCast(ns));
const result = fse.waiting_semaphore.wait(timeout: {
const ns = timeout_ns orelse break :timeout .FOREVER;
break :timeout .time(.NOW, @intCast(ns));
});
return switch (result) {
0 => .dirty,
@@ -382,7 +382,7 @@ fn eventCallback(
}
if (any_dirty) {
fse.since_event = rs.FSEventStreamGetLatestEventId(stream);
_ = dispatch_semaphore_signal(fse.waiting_semaphore);
_ = fse.waiting_semaphore.signal();
}
}
fn dirStartsWith(path: []const u8, prefix: []const u8) bool {
@@ -392,25 +392,6 @@ fn dirStartsWith(path: []const u8, prefix: []const u8) bool {
return true; // `path` is `/foo/bar/...`, `prefix` is `/foo/bar`
}
const dispatch_time_t = enum(u64) {
now = 0,
forever = std.math.maxInt(u64),
_,
};
extern fn dispatch_time(base: dispatch_time_t, delta_ns: i64) dispatch_time_t;
const dispatch_semaphore_t = *opaque {};
extern fn dispatch_semaphore_create(value: isize) dispatch_semaphore_t;
extern fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) isize;
extern fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) isize;
const dispatch_queue_t = *opaque {};
const dispatch_queue_attr_t = ?*opaque {
const SERIAL: dispatch_queue_attr_t = null;
};
extern fn dispatch_queue_create(label: [*:0]const u8, attr: dispatch_queue_attr_t) dispatch_queue_t;
extern fn dispatch_release(object: *anyopaque) void;
const CFAllocatorRef = ?*const opaque {};
const CFArrayRef = *const opaque {};
const CFStringRef = *const opaque {};
@@ -489,6 +470,7 @@ const FSEventStreamEventFlags = packed struct(u32) {
_: u24 = 0,
};
const dispatch = std.c.dispatch;
const std = @import("std");
const Io = std.Io;
const assert = std.debug.assert;