Merge pull request #7134 from alexnask/fix_std_fs_watch

The std.fs.Watch rewrite PR
This commit is contained in:
Andrew Kelley
2021-01-11 12:45:36 -08:00
committed by GitHub
5 changed files with 399 additions and 353 deletions
+387 -346
View File
@@ -3,7 +3,7 @@
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
// The MIT license requires this copyright notice to be included in all copies
// and substantial portions of the software.
const std = @import("../std.zig");
const std = @import("std");
const builtin = @import("builtin");
const event = std.event;
const assert = std.debug.assert;
@@ -24,16 +24,6 @@ const WatchEventId = enum {
Delete,
};
fn eqlString(a: []const u16, b: []const u16) bool {
if (a.len != b.len) return false;
if (a.ptr == b.ptr) return true;
return mem.compare(u16, a, b) == .Equal;
}
fn hashString(s: []const u16) u32 {
return @truncate(u32, std.hash.Wyhash.hash(0, mem.sliceAsBytes(s)));
}
const WatchEventError = error{
UserResourceLimitReached,
SystemResources,
@@ -43,7 +33,7 @@ const WatchEventError = error{
pub fn Watch(comptime V: type) type {
return struct {
channel: *event.Channel(Event.Error!Event),
channel: event.Channel(Event.Error!Event),
os_data: OsData,
allocator: *Allocator,
@@ -57,10 +47,10 @@ pub fn Watch(comptime V: type) type {
};
const KqOsData = struct {
file_table: FileTable,
table_lock: event.Lock,
file_table: FileTable,
const FileTable = std.StringHashMap(*Put);
const FileTable = std.StringHashMapUnmanaged(*Put);
const Put = struct {
putter_frame: @Frame(kqPutEvents),
cancelled: bool = false,
@@ -71,21 +61,15 @@ pub fn Watch(comptime V: type) type {
const WindowsOsData = struct {
table_lock: event.Lock,
dir_table: DirTable,
all_putters: std.atomic.Queue(Put),
ref_count: std.atomic.Int(usize),
cancelled: bool = false,
const Put = struct {
putter: anyframe,
cancelled: bool = false,
};
const DirTable = std.StringHashMap(*Dir);
const FileTable = std.HashMap([]const u16, V, hashString, eqlString);
const DirTable = std.StringHashMapUnmanaged(*Dir);
const FileTable = std.StringHashMapUnmanaged(V);
const Dir = struct {
putter_frame: @Frame(windowsDirReader),
file_table: FileTable,
table_lock: event.Lock,
dir_handle: os.windows.HANDLE,
};
};
@@ -96,8 +80,8 @@ pub fn Watch(comptime V: type) type {
table_lock: event.Lock,
cancelled: bool = false,
const WdTable = std.AutoHashMap(i32, Dir);
const FileTable = std.StringHashMap(V);
const WdTable = std.AutoHashMapUnmanaged(i32, Dir);
const FileTable = std.StringHashMapUnmanaged(V);
const Dir = struct {
dirname: []const u8,
@@ -110,19 +94,14 @@ pub fn Watch(comptime V: type) type {
pub const Event = struct {
id: Id,
data: V,
dirname: []const u8,
basename: []const u8,
pub const Id = WatchEventId;
pub const Error = WatchEventError;
};
pub fn init(allocator: *Allocator, event_buf_count: usize) !*Self {
const channel = try allocator.create(event.Channel(Event.Error!Event));
errdefer allocator.destroy(channel);
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
errdefer allocator.free(buf);
channel.init(buf);
errdefer channel.deinit();
const self = try allocator.create(Self);
errdefer allocator.destroy(self);
@@ -133,15 +112,17 @@ pub fn Watch(comptime V: type) type {
self.* = Self{
.allocator = allocator,
.channel = channel,
.channel = undefined,
.os_data = OsData{
.putter_frame = undefined,
.inotify_fd = inotify_fd,
.wd_table = OsData.WdTable.init(allocator),
.table_lock = event.Lock.init(),
.table_lock = event.Lock{},
},
};
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
self.channel.init(buf);
self.os_data.putter_frame = async self.linuxEventPutter();
return self;
},
@@ -149,82 +130,93 @@ pub fn Watch(comptime V: type) type {
.windows => {
self.* = Self{
.allocator = allocator,
.channel = channel,
.channel = undefined,
.os_data = OsData{
.table_lock = event.Lock.init(),
.table_lock = event.Lock{},
.dir_table = OsData.DirTable.init(allocator),
.ref_count = std.atomic.Int(usize).init(1),
.all_putters = std.atomic.Queue(anyframe).init(),
},
};
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
self.channel.init(buf);
return self;
},
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
self.* = Self{
.allocator = allocator,
.channel = channel,
.channel = undefined,
.os_data = OsData{
.table_lock = event.Lock.init(),
.table_lock = event.Lock{},
.file_table = OsData.FileTable.init(allocator),
},
};
var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
self.channel.init(buf);
return self;
},
else => @compileError("Unsupported OS"),
}
}
/// All addFile calls and removeFile calls must have completed.
pub fn deinit(self: *Self) void {
switch (builtin.os.tag) {
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
// TODO we need to cancel the frames before destroying the lock
self.os_data.table_lock.deinit();
var it = self.os_data.file_table.iterator();
while (it.next()) |entry| {
entry.cancelled = true;
await entry.value.putter;
entry.value.cancelled = true;
// @TODO Close the fd here?
await entry.value.putter_frame;
self.allocator.free(entry.key);
self.allocator.free(entry.value);
self.allocator.destroy(entry.value);
}
self.channel.deinit();
self.allocator.destroy(self.channel.buffer_nodes);
self.allocator.destroy(self);
},
.linux => {
self.os_data.cancelled = true;
{
// Remove all directory watches linuxEventPutter will take care of
// cleaning up the memory and closing the inotify fd.
var dir_it = self.os_data.wd_table.iterator();
while (dir_it.next()) |wd_entry| {
const rc = os.linux.inotify_rm_watch(self.os_data.inotify_fd, wd_entry.key);
// Errno can only be EBADF, EINVAL if either the inotify fs or the wd are invalid
std.debug.assert(rc == 0);
}
}
await self.os_data.putter_frame;
self.allocator.destroy(self);
},
.windows => {
while (self.os_data.all_putters.get()) |putter_node| {
putter_node.cancelled = true;
await putter_node.frame;
self.os_data.cancelled = true;
var dir_it = self.os_data.dir_table.iterator();
while (dir_it.next()) |dir_entry| {
if (windows.kernel32.CancelIoEx(dir_entry.value.dir_handle, null) != 0) {
// We canceled the pending ReadDirectoryChangesW operation, but our
// frame is still suspending, now waiting indefinitely.
// Thus, it is safe to resume it ourslves
resume dir_entry.value.putter_frame;
} else {
std.debug.assert(windows.kernel32.GetLastError() == .NOT_FOUND);
// We are at another suspend point, we can await safely for the
// function to exit the loop
await dir_entry.value.putter_frame;
}
self.allocator.free(dir_entry.key);
var file_it = dir_entry.value.file_table.iterator();
while (file_it.next()) |file_entry| {
self.allocator.free(file_entry.key);
}
dir_entry.value.file_table.deinit(self.allocator);
self.allocator.destroy(dir_entry.value);
}
self.deref();
self.os_data.dir_table.deinit(self.allocator);
},
else => @compileError("Unsupported OS"),
}
}
fn ref(self: *Self) void {
_ = self.os_data.ref_count.incr();
}
fn deref(self: *Self) void {
if (self.os_data.ref_count.decr() == 1) {
self.os_data.table_lock.deinit();
var it = self.os_data.dir_table.iterator();
while (it.next()) |entry| {
self.allocator.free(entry.key);
self.allocator.destroy(entry.value);
}
self.os_data.dir_table.deinit();
self.channel.deinit();
self.allocator.destroy(self.channel.buffer_nodes);
self.allocator.destroy(self);
}
self.allocator.free(self.channel.buffer_nodes);
self.channel.deinit();
self.allocator.destroy(self);
}
pub fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
@@ -237,217 +229,208 @@ pub fn Watch(comptime V: type) type {
}
fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
const resolved_path = try std.fs.path.resolve(self.allocator, [_][]const u8{file_path});
var resolved_path_consumed = false;
defer if (!resolved_path_consumed) self.allocator.free(resolved_path);
var realpath_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
const realpath = try os.realpath(file_path, &realpath_buf);
var close_op = try CloseOperation.start(self.allocator);
var close_op_consumed = false;
defer if (!close_op_consumed) close_op.finish();
const held = self.os_data.table_lock.acquire();
defer held.release();
const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0;
const mode = 0;
const fd = try openPosix(self.allocator, resolved_path, flags, mode);
close_op.setHandle(fd);
var put = try self.allocator.create(OsData.Put);
errdefer self.allocator.destroy(put);
put.* = OsData.Put{
.value = value,
.putter_frame = undefined,
};
put.putter_frame = async self.kqPutEvents(close_op, put);
close_op_consumed = true;
errdefer {
put.cancelled = true;
await put.putter_frame;
const gop = try self.os_data.file_table.getOrPut(self.allocator, realpath);
errdefer self.os_data.file_table.removeAssertDiscard(realpath);
if (gop.found_existing) {
const prev_value = gop.entry.value.value;
gop.entry.value.value = value;
return prev_value;
}
const result = blk: {
const held = self.os_data.table_lock.acquire();
defer held.release();
const gop = try self.os_data.file_table.getOrPut(resolved_path);
if (gop.found_existing) {
const prev_value = gop.kv.value.value;
await gop.kv.value.putter_frame;
gop.kv.value = put;
break :blk prev_value;
} else {
resolved_path_consumed = true;
gop.kv.value = put;
break :blk null;
}
gop.entry.key = try self.allocator.dupe(u8, realpath);
errdefer self.allocator.free(gop.entry.key);
gop.entry.value = try self.allocator.create(OsData.Put);
errdefer self.allocator.destroy(gop.entry.value);
gop.entry.value.* = .{
.putter_frame = undefined,
.value = value,
};
return result;
// @TODO Can I close this fd and get an error from bsdWaitKev?
const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0;
const fd = try os.open(realpath, flags, 0);
gop.entry.value.putter_frame = async self.kqPutEvents(fd, gop.entry.key, gop.entry.value);
return null;
}
fn kqPutEvents(self: *Self, close_op: *CloseOperation, put: *OsData.Put) void {
fn kqPutEvents(self: *Self, fd: os.fd_t, file_path: []const u8, put: *OsData.Put) void {
global_event_loop.beginOneEvent();
defer {
close_op.finish();
global_event_loop.finishOneEvent();
// @TODO: Remove this if we force close otherwise
os.close(fd);
}
// We need to manually do a bsdWaitKev to access the fflags.
var resume_node = event.Loop.ResumeNode.Basic{
.base = .{
.id = .Basic,
.handle = @frame(),
.overlapped = event.Loop.ResumeNode.overlapped_init,
},
.kev = undefined,
};
var kevs = [1]os.Kevent{undefined};
const kev = &kevs[0];
while (!put.cancelled) {
if (global_event_loop.bsdWaitKev(
@intCast(usize, close_op.getHandle()),
os.EVFILT_VNODE,
os.NOTE_WRITE | os.NOTE_DELETE,
)) |kev| {
// TODO handle EV_ERROR
if (kev.fflags & os.NOTE_DELETE != 0) {
self.channel.put(Self.Event{
.id = Event.Id.Delete,
.data = put.value,
});
} else if (kev.fflags & os.NOTE_WRITE != 0) {
self.channel.put(Self.Event{
.id = Event.Id.CloseWrite,
.data = put.value,
});
}
} else |err| switch (err) {
error.EventNotFound => unreachable,
error.ProcessNotFound => unreachable,
error.Overflow => unreachable,
error.AccessDenied, error.SystemResources => |casted_err| {
self.channel.put(casted_err);
},
kev.* = os.Kevent{
.ident = @intCast(usize, fd),
.filter = os.EVFILT_VNODE,
.flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR | os.EV_ONESHOT |
os.NOTE_WRITE | os.NOTE_DELETE | os.NOTE_REVOKE,
.fflags = 0,
.data = 0,
.udata = @ptrToInt(&resume_node.base),
};
suspend {
global_event_loop.beginOneEvent();
errdefer global_event_loop.finishOneEvent();
const empty_kevs = &[0]os.Kevent{};
_ = os.kevent(global_event_loop.os_data.kqfd, &kevs, empty_kevs, null) catch |err| switch (err) {
error.EventNotFound,
error.ProcessNotFound,
error.Overflow,
=> unreachable,
error.AccessDenied, error.SystemResources => |e| {
self.channel.put(e);
continue;
},
};
}
if (kev.flags & os.EV_ERROR != 0) {
self.channel.put(os.unexpectedErrno(os.errno(kev.data)));
continue;
}
if (kev.fflags & os.NOTE_DELETE != 0 or kev.fflags & os.NOTE_REVOKE != 0) {
self.channel.put(Self.Event{
.id = .Delete,
.data = put.value,
.dirname = std.fs.path.dirname(file_path) orelse "/",
.basename = std.fs.path.basename(file_path),
});
} else if (kev.fflags & os.NOTE_WRITE != 0) {
self.channel.put(Self.Event{
.id = .CloseWrite,
.data = put.value,
.dirname = std.fs.path.dirname(file_path) orelse "/",
.basename = std.fs.path.basename(file_path),
});
}
}
}
fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
const dirname = std.fs.path.dirname(file_path) orelse ".";
const dirname_with_null = try std.cstr.addNullByte(self.allocator, dirname);
var dirname_with_null_consumed = false;
defer if (!dirname_with_null_consumed) self.channel.free(dirname_with_null);
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
const basename = std.fs.path.basename(file_path);
const basename_with_null = try std.cstr.addNullByte(self.allocator, basename);
var basename_with_null_consumed = false;
defer if (!basename_with_null_consumed) self.allocator.free(basename_with_null);
const wd = try os.inotify_add_watchZ(
const wd = try os.inotify_add_watch(
self.os_data.inotify_fd,
dirname_with_null.ptr,
os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK,
dirname,
os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_DELETE | os.linux.IN_EXCL_UNLINK,
);
// wd is either a newly created watch or an existing one.
const held = self.os_data.table_lock.acquire();
defer held.release();
const gop = try self.os_data.wd_table.getOrPut(wd);
const gop = try self.os_data.wd_table.getOrPut(self.allocator, wd);
errdefer self.os_data.wd_table.removeAssertDiscard(wd);
if (!gop.found_existing) {
gop.kv.value = OsData.Dir{
.dirname = dirname_with_null,
gop.entry.value = OsData.Dir{
.dirname = try self.allocator.dupe(u8, dirname),
.file_table = OsData.FileTable.init(self.allocator),
};
dirname_with_null_consumed = true;
}
const dir = &gop.kv.value;
const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
const dir = &gop.entry.value;
const file_table_gop = try dir.file_table.getOrPut(self.allocator, basename);
errdefer dir.file_table.removeAssertDiscard(basename);
if (file_table_gop.found_existing) {
const prev_value = file_table_gop.kv.value;
file_table_gop.kv.value = value;
const prev_value = file_table_gop.entry.value;
file_table_gop.entry.value = value;
return prev_value;
} else {
file_table_gop.kv.value = value;
basename_with_null_consumed = true;
file_table_gop.entry.key = try self.allocator.dupe(u8, basename);
file_table_gop.entry.value = value;
return null;
}
}
fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
// TODO we might need to convert dirname and basename to canonical file paths ("short"?)
const dirname = try self.allocator.dupe(u8, std.fs.path.dirname(file_path) orelse ".");
var dirname_consumed = false;
defer if (!dirname_consumed) self.allocator.free(dirname);
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
var dirname_path_space: windows.PathSpace = undefined;
dirname_path_space.len = try std.unicode.utf8ToUtf16Le(&dirname_path_space.data, dirname);
dirname_path_space.data[dirname_path_space.len] = 0;
const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, dirname);
defer self.allocator.free(dirname_utf16le);
// TODO https://github.com/ziglang/zig/issues/265
const basename = std.fs.path.basename(file_path);
const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, basename);
var basename_utf16le_null_consumed = false;
defer if (!basename_utf16le_null_consumed) self.allocator.free(basename_utf16le_null);
const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1];
const dir_handle = try windows.OpenFile(dirname_utf16le, .{
.dir = std.fs.cwd().fd,
.access_mask = windows.FILE_LIST_DIRECTORY,
.creation = windows.FILE_OPEN,
.io_mode = .blocking,
.open_dir = true,
});
var dir_handle_consumed = false;
defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle);
var basename_path_space: windows.PathSpace = undefined;
basename_path_space.len = try std.unicode.utf8ToUtf16Le(&basename_path_space.data, basename);
basename_path_space.data[basename_path_space.len] = 0;
const held = self.os_data.table_lock.acquire();
defer held.release();
const gop = try self.os_data.dir_table.getOrPut(dirname);
const gop = try self.os_data.dir_table.getOrPut(self.allocator, dirname);
errdefer self.os_data.dir_table.removeAssertDiscard(dirname);
if (gop.found_existing) {
const dir = gop.kv.value;
const held_dir_lock = dir.table_lock.acquire();
defer held_dir_lock.release();
const dir = gop.entry.value;
const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null);
const file_gop = try dir.file_table.getOrPut(self.allocator, basename);
errdefer dir.file_table.removeAssertDiscard(basename);
if (file_gop.found_existing) {
const prev_value = file_gop.kv.value;
file_gop.kv.value = value;
const prev_value = file_gop.entry.value;
file_gop.entry.value = value;
return prev_value;
} else {
file_gop.kv.value = value;
basename_utf16le_null_consumed = true;
file_gop.entry.value = value;
file_gop.entry.key = try self.allocator.dupe(u8, basename);
return null;
}
} else {
errdefer _ = self.os_data.dir_table.remove(dirname);
const dir_handle = try windows.OpenFile(dirname_path_space.span(), .{
.dir = std.fs.cwd().fd,
.access_mask = windows.FILE_LIST_DIRECTORY,
.creation = windows.FILE_OPEN,
.io_mode = .evented,
.open_dir = true,
});
errdefer windows.CloseHandle(dir_handle);
const dir = try self.allocator.create(OsData.Dir);
errdefer self.allocator.destroy(dir);
gop.entry.key = try self.allocator.dupe(u8, dirname);
errdefer self.allocator.free(gop.entry.key);
dir.* = OsData.Dir{
.file_table = OsData.FileTable.init(self.allocator),
.table_lock = event.Lock.init(),
.putter_frame = undefined,
.dir_handle = dir_handle,
};
gop.kv.value = dir;
assert((try dir.file_table.put(basename_utf16le_no_null, value)) == null);
basename_utf16le_null_consumed = true;
dir.putter_frame = async self.windowsDirReader(dir_handle, dir);
dir_handle_consumed = true;
dirname_consumed = true;
gop.entry.value = dir;
try dir.file_table.put(self.allocator, try self.allocator.dupe(u8, basename), value);
dir.putter_frame = async self.windowsDirReader(dir, gop.entry.key);
return null;
}
}
fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
self.ref();
defer self.deref();
defer os.close(dir_handle);
var putter_node = std.atomic.Queue(anyframe).Node{
.data = .{ .putter = @frame() },
.prev = null,
.next = null,
};
self.os_data.all_putters.put(&putter_node);
defer _ = self.os_data.all_putters.remove(&putter_node);
fn windowsDirReader(self: *Self, dir: *OsData.Dir, dirname: []const u8) void {
defer os.close(dir.dir_handle);
var resume_node = Loop.ResumeNode.Basic{
.base = Loop.ResumeNode{
.id = Loop.ResumeNode.Id.Basic,
.id = .Basic,
.handle = @frame(),
.overlapped = windows.OVERLAPPED{
.Internal = 0,
@@ -458,157 +441,193 @@ pub fn Watch(comptime V: type) type {
},
},
};
var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
// TODO handle this error not in the channel but in the setup
_ = windows.CreateIoCompletionPort(
dir_handle,
global_event_loop.os_data.io_port,
undefined,
undefined,
) catch |err| {
self.channel.put(err);
return;
};
global_event_loop.beginOneEvent();
defer global_event_loop.finishOneEvent();
while (!putter_node.data.cancelled) {
{
// TODO only 1 beginOneEvent for the whole function
global_event_loop.beginOneEvent();
errdefer global_event_loop.finishOneEvent();
errdefer {
_ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped);
}
suspend {
_ = windows.kernel32.ReadDirectoryChangesW(
dir_handle,
&event_buf,
@intCast(windows.DWORD, event_buf.len),
windows.FALSE, // watch subtree
windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
null, // number of bytes transferred (unused for async)
&resume_node.base.overlapped,
null, // completion routine - unused because we use IOCP
);
}
while (!self.os_data.cancelled) main_loop: {
suspend {
_ = windows.kernel32.ReadDirectoryChangesW(
dir.dir_handle,
&event_buf,
event_buf.len,
windows.FALSE, // watch subtree
windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
null, // number of bytes transferred (unused for async)
&resume_node.base.overlapped,
null, // completion routine - unused because we use IOCP
);
}
var bytes_transferred: windows.DWORD = undefined;
if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
const err = switch (windows.kernel32.GetLastError()) {
if (windows.kernel32.GetOverlappedResult(
dir.dir_handle,
&resume_node.base.overlapped,
&bytes_transferred,
windows.FALSE,
) == 0) {
const potential_error = windows.kernel32.GetLastError();
const err = switch (potential_error) {
.OPERATION_ABORTED, .IO_INCOMPLETE => err_blk: {
if (self.os_data.cancelled)
break :main_loop
else
break :err_blk windows.unexpectedError(potential_error);
},
else => |err| windows.unexpectedError(err),
};
self.channel.put(err);
} else {
// can't use @bytesToSlice because of the special variable length name field
var ptr = event_buf[0..].ptr;
var ptr: [*]u8 = &event_buf;
const end_ptr = ptr + bytes_transferred;
var ev: *windows.FILE_NOTIFY_INFORMATION = undefined;
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) {
ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr);
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) {
const ev = @ptrCast(*const windows.FILE_NOTIFY_INFORMATION, ptr);
const emit = switch (ev.Action) {
windows.FILE_ACTION_REMOVED => WatchEventId.Delete,
windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite,
windows.FILE_ACTION_MODIFIED => .CloseWrite,
else => null,
};
if (emit) |id| {
const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2];
const user_value = blk: {
const held = dir.table_lock.acquire();
defer held.release();
const basename_ptr = @ptrCast([*]u16, ptr + @sizeOf(windows.FILE_NOTIFY_INFORMATION));
const basename_utf16le = basename_ptr[0 .. ev.FileNameLength / 2];
var basename_data: [std.fs.MAX_PATH_BYTES]u8 = undefined;
const basename = basename_data[0 .. std.unicode.utf16leToUtf8(&basename_data, basename_utf16le) catch unreachable];
if (dir.file_table.get(basename_utf16le)) |entry| {
break :blk entry.value;
} else {
break :blk null;
}
};
if (user_value) |v| {
if (dir.file_table.getEntry(basename)) |entry| {
self.channel.put(Event{
.id = id,
.data = v,
.data = entry.value,
.dirname = dirname,
.basename = entry.key,
});
}
}
if (ev.NextEntryOffset == 0) break;
ptr = @alignCast(@alignOf(windows.FILE_NOTIFY_INFORMATION), ptr + ev.NextEntryOffset);
}
}
}
}
pub fn removeFile(self: *Self, file_path: []const u8) ?V {
@panic("TODO");
pub fn removeFile(self: *Self, file_path: []const u8) !?V {
switch (builtin.os.tag) {
.linux => {
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
const basename = std.fs.path.basename(file_path);
const held = self.os_data.table_lock.acquire();
defer held.release();
const dir = self.os_data.wd_table.get(dirname) orelse return null;
if (dir.file_table.remove(basename)) |file_entry| {
self.allocator.free(file_entry.key);
return file_entry.value;
}
return null;
},
.windows => {
const dirname = std.fs.path.dirname(file_path) orelse if (file_path[0] == '/') "/" else ".";
const basename = std.fs.path.basename(file_path);
const held = self.os_data.table_lock.acquire();
defer held.release();
const dir = self.os_data.dir_table.get(dirname) orelse return null;
if (dir.file_table.remove(basename)) |file_entry| {
self.allocator.free(file_entry.key);
return file_entry.value;
}
return null;
},
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
var realpath_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
const realpath = try os.realpath(file_path, &realpath_buf);
const held = self.os_data.table_lock.acquire();
defer held.release();
const entry = self.os_data.file_table.get(realpath) orelse return null;
entry.value.cancelled = true;
// @TODO Close the fd here?
await entry.value.putter_frame;
self.allocator.free(entry.key);
self.allocator.destroy(entry.value);
self.os_data.file_table.removeAssertDiscard(realpath);
},
else => @compileError("Unsupported OS"),
}
}
fn linuxEventPutter(self: *Self) void {
global_event_loop.beginOneEvent();
defer {
self.os_data.table_lock.deinit();
var wd_it = self.os_data.wd_table.iterator();
while (wd_it.next()) |wd_entry| {
var file_it = wd_entry.value.file_table.iterator();
while (file_it.next()) |file_entry| {
self.allocator.free(file_entry.key);
}
self.allocator.free(wd_entry.value.dirname);
wd_entry.value.file_table.deinit();
}
self.os_data.wd_table.deinit();
global_event_loop.finishOneEvent();
std.debug.assert(self.os_data.wd_table.count() == 0);
self.os_data.wd_table.deinit(self.allocator);
os.close(self.os_data.inotify_fd);
self.channel.deinit();
self.allocator.free(self.channel.buffer_nodes);
self.channel.deinit();
global_event_loop.finishOneEvent();
}
var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
while (!self.os_data.cancelled) {
const rc = os.linux.read(self.os_data.inotify_fd, &event_buf, event_buf.len);
const errno = os.linux.getErrno(rc);
switch (errno) {
0 => {
// can't use @bytesToSlice because of the special variable length name field
var ptr = event_buf[0..].ptr;
const end_ptr = ptr + event_buf.len;
var ev: *os.linux.inotify_event = undefined;
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) {
ev = @ptrCast(*os.linux.inotify_event, ptr);
if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
// `ev.len` counts all bytes in `ev.name` including terminating null byte.
const basename_with_null = basename_ptr[0..ev.len];
const user_value = blk: {
const held = self.os_data.table_lock.acquire();
defer held.release();
const bytes_read = global_event_loop.read(self.os_data.inotify_fd, &event_buf, false) catch unreachable;
const dir = &self.os_data.wd_table.get(ev.wd).?.value;
if (dir.file_table.get(basename_with_null)) |entry| {
break :blk entry.value;
} else {
break :blk null;
}
};
if (user_value) |v| {
self.channel.put(Event{
.id = WatchEventId.CloseWrite,
.data = v,
});
}
}
var ptr: [*]u8 = &event_buf;
const end_ptr = ptr + bytes_read;
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) {
const ev = @ptrCast(*const os.linux.inotify_event, ptr);
if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
const basename = std.mem.span(@ptrCast([*:0]u8, basename_ptr));
ptr = @alignCast(@alignOf(os.linux.inotify_event), ptr + @sizeOf(os.linux.inotify_event) + ev.len);
const dir = &self.os_data.wd_table.get(ev.wd).?;
if (dir.file_table.getEntry(basename)) |file_value| {
self.channel.put(Event{
.id = .CloseWrite,
.data = file_value.value,
.dirname = dir.dirname,
.basename = file_value.key,
});
}
},
os.linux.EINTR => continue,
os.linux.EINVAL => unreachable,
os.linux.EFAULT => unreachable,
os.linux.EAGAIN => {
global_event_loop.linuxWaitFd(self.os_data.inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN | os.EPOLLONESHOT);
},
else => unreachable,
} else if (ev.mask & os.linux.IN_IGNORED == os.linux.IN_IGNORED) {
// Directory watch was removed
const held = self.os_data.table_lock.acquire();
defer held.release();
if (self.os_data.wd_table.remove(ev.wd)) |*wd_entry| {
var file_it = wd_entry.value.file_table.iterator();
while (file_it.next()) |file_entry| {
self.allocator.free(file_entry.key);
}
self.allocator.free(wd_entry.value.dirname);
wd_entry.value.file_table.deinit(self.allocator);
}
} else if (ev.mask & os.linux.IN_DELETE == os.linux.IN_DELETE) {
// File or directory was removed or deleted
const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
const basename = std.mem.span(@ptrCast([*:0]u8, basename_ptr));
const dir = &self.os_data.wd_table.get(ev.wd).?;
if (dir.file_table.getEntry(basename)) |file_value| {
self.channel.put(Event{
.id = .Delete,
.data = file_value.value,
.dirname = dir.dirname,
.basename = file_value.key,
});
}
}
ptr = @alignCast(@alignOf(os.linux.inotify_event), ptr + @sizeOf(os.linux.inotify_event) + ev.len);
}
}
}
@@ -617,19 +636,19 @@ pub fn Watch(comptime V: type) type {
const test_tmp_dir = "std_event_fs_test";
test "write a file, watch it, write it again" {
// TODO re-enable this test
if (true) return error.SkipZigTest;
test "write a file, watch it, write it again, delete it" {
if (!std.io.is_async) return error.SkipZigTest;
// TODO https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded) return error.SkipZigTest;
try fs.cwd().makePath(test_tmp_dir);
defer fs.cwd().deleteTree(test_tmp_dir) catch {};
try std.fs.cwd().makePath(test_tmp_dir);
defer std.fs.cwd().deleteTree(test_tmp_dir) catch {};
const allocator = std.heap.page_allocator;
return testFsWatch(&allocator);
return testWriteWatchWriteDelete(std.testing.allocator);
}
fn testFsWatch(allocator: *Allocator) !void {
const file_path = try std.fs.path.join(allocator, [_][]const u8{ test_tmp_dir, "file.txt" });
fn testWriteWatchWriteDelete(allocator: *Allocator) !void {
const file_path = try std.fs.path.join(allocator, &[_][]const u8{ test_tmp_dir, "file.txt" });
defer allocator.free(file_path);
const contents =
@@ -639,9 +658,10 @@ fn testFsWatch(allocator: *Allocator) !void {
const line2_offset = 7;
// first just write then read the file
try writeFile(allocator, file_path, contents);
try std.fs.cwd().writeFile(file_path, contents);
const read_contents = try readFile(allocator, file_path, 1024 * 1024);
const read_contents = try std.fs.cwd().readFileAlloc(allocator, file_path, 1024 * 1024);
defer allocator.free(read_contents);
testing.expectEqualSlices(u8, contents, read_contents);
// now watch the file
@@ -650,28 +670,49 @@ fn testFsWatch(allocator: *Allocator) !void {
testing.expect((try watch.addFile(file_path, {})) == null);
const ev = watch.channel.get();
var ev = async watch.channel.get();
var ev_consumed = false;
defer if (!ev_consumed) await ev;
defer if (!ev_consumed) {
_ = await ev;
};
// overwrite line 2
const fd = try await openReadWrite(file_path, File.default_mode);
const file = try std.fs.cwd().openFile(file_path, .{ .read = true, .write = true });
{
defer os.close(fd);
try pwritev(allocator, fd, []const []const u8{"lorem ipsum"}, line2_offset);
defer file.close();
const write_contents = "lorem ipsum";
var iovec = [_]os.iovec_const{.{
.iov_base = write_contents,
.iov_len = write_contents.len,
}};
_ = try file.pwritevAll(&iovec, line2_offset);
}
ev_consumed = true;
switch ((try await ev).id) {
WatchEventId.CloseWrite => {},
WatchEventId.Delete => @panic("wrong event"),
.CloseWrite => {
ev_consumed = true;
},
.Delete => @panic("wrong event"),
}
const contents_updated = try readFile(allocator, file_path, 1024 * 1024);
const contents_updated = try std.fs.cwd().readFileAlloc(allocator, file_path, 1024 * 1024);
defer allocator.free(contents_updated);
testing.expectEqualSlices(u8,
\\line 1
\\lorem ipsum
, contents_updated);
// TODO test deleting the file and then re-adding it. we should get events for both
ev = async watch.channel.get();
ev_consumed = false;
try std.fs.cwd().deleteFile(file_path);
switch ((try await ev).id) {
.Delete => {
ev_consumed = true;
},
.CloseWrite => @panic("wrong event"),
}
}
// TODO Test: Add another file watch, remove the old file watch, get an event in the new
+3
View File
@@ -569,6 +569,9 @@ pub const EV_ONESHOT = 0x0010;
/// clear event state after reporting
pub const EV_CLEAR = 0x0020;
/// error, event data contains errno
pub const EV_ERROR = 0x4000;
/// force immediate event output
/// ... with or without EV_ERROR
/// ... use KEVENT_FLAG_ERROR_EVENTS
+6 -5
View File
@@ -109,7 +109,12 @@ pub fn OpenFile(sub_path_w: []const u16, options: OpenFileOptions) OpenError!HAN
0,
);
switch (rc) {
.SUCCESS => return result,
.SUCCESS => {
if (std.io.is_async and options.io_mode == .evented) {
_ = CreateIoCompletionPort(result, std.event.Loop.instance.?.os_data.io_port, undefined, undefined) catch undefined;
}
return result;
},
.OBJECT_NAME_INVALID => unreachable,
.OBJECT_NAME_NOT_FOUND => return error.FileNotFound,
.OBJECT_PATH_NOT_FOUND => return error.FileNotFound,
@@ -418,8 +423,6 @@ pub fn ReadFile(in_hFile: HANDLE, buffer: []u8, offset: ?u64, io_mode: std.io.Mo
},
},
};
// TODO only call create io completion port once per fd
_ = CreateIoCompletionPort(in_hFile, loop.os_data.io_port, undefined, undefined) catch undefined;
loop.beginOneEvent();
suspend {
// TODO handle buffer bigger than DWORD can hold
@@ -500,8 +503,6 @@ pub fn WriteFile(
},
},
};
// TODO only call create io completion port once per fd
_ = CreateIoCompletionPort(handle, loop.os_data.io_port, undefined, undefined) catch undefined;
loop.beginOneEvent();
suspend {
const adjusted_len = math.cast(DWORD, bytes.len) catch maxInt(DWORD);
+2 -1
View File
@@ -813,7 +813,8 @@ pub const FILE_NOTIFY_INFORMATION = extern struct {
NextEntryOffset: DWORD,
Action: DWORD,
FileNameLength: DWORD,
FileName: [1]WCHAR,
// Flexible array member
// FileName: [1]WCHAR,
};
pub const FILE_ACTION_ADDED = 0x00000001;
+1 -1
View File
@@ -8,7 +8,7 @@ usingnamespace @import("bits.zig");
pub extern "kernel32" fn AddVectoredExceptionHandler(First: c_ulong, Handler: ?VECTORED_EXCEPTION_HANDLER) callconv(WINAPI) ?*c_void;
pub extern "kernel32" fn RemoveVectoredExceptionHandler(Handle: HANDLE) callconv(WINAPI) c_ulong;
pub extern "kernel32" fn CancelIoEx(hFile: HANDLE, lpOverlapped: LPOVERLAPPED) callconv(WINAPI) BOOL;
pub extern "kernel32" fn CancelIoEx(hFile: HANDLE, lpOverlapped: ?LPOVERLAPPED) callconv(WINAPI) BOOL;
pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL;