Skip to content

Commit

Permalink
Fix macOS DirectoryWatcher and signals
Browse files Browse the repository at this point in the history
  • Loading branch information
etcimon committed Dec 17, 2023
1 parent 77292e9 commit 7d37cbf
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ build
*.exe
*.*~
dub.selections.json
*.log
118 changes: 83 additions & 35 deletions source/libasync/posix.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import std.conv : to;
import std.datetime : Duration, msecs, seconds, SysTime;
import std.traits : isIntegral;
import std.typecons : Tuple, tuple;
import memutils.vector : Array;
import memutils.vector : Array, Vector;
import std.exception;

import core.stdc.errno;
Expand Down Expand Up @@ -122,7 +122,10 @@ package:


m_instanceId = i;
static if (!EPOLL) g_threadId = new size_t(cast(size_t)m_instanceId);
static if (!EPOLL) {
static if (LOG) tracef("Set global thread ID: %d", m_instanceId);
g_threadId = new size_t(cast(size_t)m_instanceId);
}

core.atomic.atomicOp!"+="(i, cast(ushort) 1);
m_evLoop = evl;
Expand Down Expand Up @@ -234,8 +237,8 @@ package:
try {
if (!gs_queueMutex) {
gs_queueMutex = ThreadMem.alloc!ReadWriteMutex();
gs_signalQueue = Array!(Array!AsyncSignal)();
gs_idxQueue = Array!(Array!size_t)();
gs_signalQueue = Vector!(Array!AsyncSignal)();
gs_idxQueue = Vector!(Array!size_t)();
}
if (g_evIdxAvailable.empty) {
g_evIdxAvailable.reserve(32);
Expand Down Expand Up @@ -294,8 +297,12 @@ package:
catch (Exception e) { assert(false, "Failed to free resources"); }

}
else
else {
close(m_evSignal.fd);
try ThreadMem.free(m_evSignal);
catch (Exception e) { assert(false, "Failed to free resources"); }
close(m_kqueuefd);
}
}

@property const(StatusInfo) status() const {
Expand Down Expand Up @@ -328,7 +335,7 @@ package:

static kevent_t[] events;
if (events.length == 0) {
try events = ThreadMem.alloc!(kevent_t[])(128);
try events = new kevent_t[128];
catch (Exception e) { assert(false, "Could not allocate events array"); }
}
}
Expand Down Expand Up @@ -425,7 +432,7 @@ package:
case EventType.Notifier:

static if (LOG) trace("Got notifier!");
static if (LOG) tracef("Thread: %d", Thread.getThis().id());
static if (LOG) tracef("Thread: %d", cast(int)Thread.getThis().id());
try info.evObj.notifierHandler();
catch (Exception e) {
setInternalError!"notifierHandler"(Status.ERROR);
Expand All @@ -434,7 +441,7 @@ package:

case EventType.DirectoryWatcher:
static if (LOG) log("Got DirectoryWatcher event!");
static if (LOG) tracef("Thread: %d", Thread.getThis().id());
static if (LOG) tracef("Thread: %d", *g_threadId);
static if (!EPOLL) {
// in KQUEUE all events will be consumed here, because they must be pre-processed
try {
Expand All @@ -456,6 +463,7 @@ package:

if (fi == DWFolderInfo.init) {
DWFileInfo tmp = m_dwFiles.get(cast(fd_t)_event.ident, DWFileInfo.init);
static if (LOG) tracef("Invalid file descriptor: %d thr %d", cast(fd_t)_event.ident, *g_threadId);
assert(tmp != DWFileInfo.init, "The event loop returned an invalid file's file descriptor for the directory watcher");
fi = m_dwFolders.get(cast(fd_t) tmp.folder, DWFolderInfo.init);
assert(fi != DWFolderInfo.init, "The event loop returned an invalid folder file descriptor for the directory watcher");
Expand All @@ -481,7 +489,7 @@ package:

case EventType.Timer:
static if (LOG) try trace("Got timer! " ~ info.fd.to!string); catch (Throwable e) {}
static if (LOG) tracef("Thread: %d", Thread.getThis().id());
static if (LOG) tracef("Thread: %d", *g_threadId);
static if (EPOLL) {
static long val;
import core.sys.posix.unistd : read;
Expand All @@ -506,7 +514,7 @@ package:

case EventType.DNSResolver:
static if (LOG) try trace("Got signal!"); catch (Throwable e) {}
static if (LOG) tracef("Thread: %d", Thread.getThis().id());
static if (LOG) tracef("Thread: %d", *g_threadId);

static if (EPOLL) {
import libasync.internals.socket_compat : freeaddrinfo, addrinfo, gaicb;
Expand Down Expand Up @@ -542,7 +550,7 @@ package:

case EventType.Signal:
static if (LOG) try trace("Got signal!"); catch (Throwable e) {}
static if (LOG) tracef("Thread: %d", Thread.getThis().id());
static if (LOG) tracef("Thread: %d", *g_threadId);

static if (EPOLL) {

Expand Down Expand Up @@ -570,18 +578,24 @@ package:
try sigarr = new AsyncSignal[32];
catch (Exception e) { assert(false, "Could not allocate signals array"); }
}
while (popSignals(sigarr)) {
foreach (AsyncSignal sig; sigarr)
trace("Popping signals");
bool more = false;
do {
more = popSignals(sigarr);
trace("Popped signals");
foreach (ref AsyncSignal sig; sigarr)
{
shared AsyncSignal ptr = cast(shared AsyncSignal) sig;
sig = null;

if (ptr is null)
break;
try (cast(shared AsyncSignal)sig).handler();
try ptr.handler();
catch (Exception e) {
setInternalError!"signal handler"(Status.ERROR);
}
}
}
} while (more);
}
break;

Expand Down Expand Up @@ -1314,6 +1328,7 @@ package:
Path filePath = Path(de.name);
if (!filePath.absolute)
filePath = path ~ filePath;
filePath.normalize();
fd_t fwd;
if (info.recursive && isDir(filePath.toNativeString()))
fwd = addRecursive(filePath, true);
Expand Down Expand Up @@ -1456,13 +1471,16 @@ package:
Array!fd_t remove_file_list;
// search for subfolders and unset them / close their wd
foreach (ref DWFolderInfo folder; m_dwFolders) {
if (folder.fd == fi.fd && folder.wi.path.startsWith(fi.wi.path)) {
auto folderpath = folder.wi.path;
folderpath.normalize();
if (folder.fd == fi.fd && folderpath.startsWith(fi.wi.path)) {

if (!event_unset(folder.wi.wd))
return false;

// search for subfiles, close their descriptors and remove them from the file list
foreach (ref const fd_t fwd, ref const DWFileInfo file; m_dwFiles) {

if (file.folder == folder.wi.wd) {
close(fwd);
remove_file_list.insertBack(fwd); // to be removed from m_dwFiles without affecting the loop
Expand Down Expand Up @@ -1531,7 +1549,7 @@ package:
Path entryPath = Path(de.name);
if (!entryPath.absolute)
entryPath = fi.wi.path ~ entryPath;

assumeWontThrow(entryPath.normalize());
if (fi.wi.recursive && isDir(entryPath.toNativeString())) {

watch(fd, WatchInfo(fi.wi.events, entryPath, fi.wi.recursive, 0) );
Expand All @@ -1540,6 +1558,7 @@ package:
auto subsubpath = Path(de.name);
if (!subsubpath.absolute)
subsubpath = subpath ~ subsubpath;
assumeWontThrow(subsubpath.normalize());
changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
if (isDir(subsubpath.toNativeString()))
genEvents(subsubpath);
Expand Down Expand Up @@ -1591,6 +1610,7 @@ package:
setInternalError!"m_dwFolders[ev.wd]"(Status.ERROR, "Could not retrieve wd index in folders");
return 0;
}
assumeWontThrow(path.normalize());

dst[i] = DWChangeInfo(evtype, path);
import std.file : isDir;
Expand Down Expand Up @@ -1650,7 +1670,12 @@ package:
}
if (cnt == changes.length)
changes.clear();
else if (cnt > 0) (*changes)[] = (*changes)[cnt .. $];
else if (cnt > 0) {
import memutils.vector;
Vector!DWChangeInfo tmp;
tmp[] = (*changes)[cnt .. $];
(*changes)[] = tmp[];
}
}
catch (Exception e) {
setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg);
Expand Down Expand Up @@ -1789,7 +1814,7 @@ package:

gaicb* host = assumeWontThrow(ThreadMem.alloc!gaicb());
sigevent* sig = &req.sig;
static if (LOG) tracef("Creating from thread: %d", Thread.getThis().id());
static if (LOG) tracef("Creating from thread: %d", g_threadId);
host.ar_request = hints;
host.ar_name = url.toStringz();
host.ar_service = null;
Expand Down Expand Up @@ -2114,6 +2139,7 @@ private:
foreach (DirEntry de; dirEntries(path.toNativeString(), SpanMode.shallow)) {
//writeln(de.name);
Path entryPath = Path(de.name);
entryPath.normalize();
bool found;

if (fi.wi.recursive && de.isDir()) {
Expand All @@ -2128,7 +2154,8 @@ private:
// compare it to the cached list fixme: make it faster using another container?
foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
if (file.folder != wd) continue; // this file isn't in the evented folder
if (file.path == entryPath) {
auto filepath = file.path;
if (filepath == entryPath) {
found = true;
static if (LOG) log("File modified? " ~ entryPath.toNativeString() ~ " at: " ~ de.timeLastModified.to!string ~ " vs: " ~ file.lastModified.to!string);
// Check if it was modified
Expand All @@ -2137,7 +2164,7 @@ private:
DWFileInfo dwf = file;
dwf.lastModified = de.timeLastModified;
m_dwFiles[id] = dwf;
changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, file.path));
changes.insertBack(DWChangeInfo(DWFileEvent.MODIFIED, filepath));
}
break;
}
Expand All @@ -2158,6 +2185,7 @@ private:
auto subsubpath = Path(de.name);
if (!subsubpath.absolute())
subsubpath = subpath ~ subsubpath;
subsubpath.normalize();
changes.insertBack(DWChangeInfo(DWFileEvent.CREATED, subsubpath));
if (isDir(subsubpath.toNativeString()))
genEvents(subsubpath);
Expand All @@ -2175,6 +2203,7 @@ private:

import core.sys.posix.fcntl : open;
fd_t fwd = open(entryPath.toNativeString().toStringz, O_EVTONLY);
static if (LOG) tracef("Register file descriptor thr%d %d for %s", *g_threadId, fwd, entryPath.toNativeString());
if (catchError!"open(watch)"(fwd))
return 0;

Expand All @@ -2195,20 +2224,25 @@ private:
// This file/folder is now current. This avoids a deletion event.
currFiles.insert(entryPath);
}

/// Now search for files/folders that were deleted in this directory (no recursivity needed).
/// Unwatch this directory and generate delete event only for the root dir
foreach (ref const fd_t id, ref const DWFileInfo file; m_dwFiles) {
if (file.folder != wd) continue; // skip those files in another folder than the evented one
bool found;
foreach (Path curr; currFiles[]) {
if (file.path == curr){
curr.normalize();
auto filepath = file.path;
static if (LOG) tracef("curr %s", curr.toNativeString());
if (filepath == curr){
static if (LOG) tracef("curr==file %s", filepath.toNativeString());
found = true;
break;
}
}
// this file/folder was in the folder but it's not there anymore
if (!found) {
static if (LOG) tracef("Deregister file descriptor thr%d %d for %s", *g_threadId, id, file.path.toNativeString());

// writeln("Deleting: ", file.path.toNativeString());
kevent_t _event;
EV_SET(&_event, cast(int) id, EVFILT_VNODE, EV_DELETE, 0, 0, null);
Expand Down Expand Up @@ -3248,7 +3282,7 @@ static if (!EPOLL)
import core.sync.mutex : Mutex;
import core.sync.rwmutex : ReadWriteMutex;
size_t g_evIdxCapacity;
Array!size_t g_evIdxAvailable;
Vector!size_t g_evIdxAvailable;

// called on run
nothrow size_t createIndex() {
Expand Down Expand Up @@ -3304,27 +3338,39 @@ static if (!EPOLL)

size_t* g_threadId;
size_t g_idxCapacity;
Array!size_t g_idxAvailable;
Vector!size_t g_idxAvailable;

__gshared ReadWriteMutex gs_queueMutex;
__gshared Array!(Array!AsyncSignal) gs_signalQueue;
__gshared Array!(Array!size_t) gs_idxQueue; // signals notified

__gshared Vector!(Array!AsyncSignal) gs_signalQueue;
__gshared Vector!(Array!size_t) gs_idxQueue; // signals notified
static ~this() {
g_evIdxAvailable.clear();
g_evIdxAvailable.destroy();
g_idxAvailable.clear();
g_idxAvailable.destroy();
if (g_threadId && *g_threadId == 0) {
gs_signalQueue.clear();
gs_signalQueue.destroy();
gs_idxQueue.destroy();
if (gs_queueMutex)
ThreadMem.free(gs_queueMutex);
gs_queueMutex = null;
}
}

// loop
nothrow bool popSignals(ref AsyncSignal[] sigarr) {
bool more;
try {
foreach (ref AsyncSignal sig; sigarr) {
if (!sig)
break;
sig = null;
}
static if (LOG) tracef("We have %d signals registered", sigarr.length);

size_t len;
synchronized(gs_queueMutex.reader) {

if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty)
static if (LOG) tracef("Idx queue len: %d, thread#%d", gs_idxQueue.length, *g_threadId);
if (gs_idxQueue.length <= *g_threadId || gs_idxQueue[*g_threadId].empty) {
static if (LOG) trace("Idx queue not long enough");
return false;
}

len = gs_idxQueue[*g_threadId].length;
import std.stdio;
Expand All @@ -3338,6 +3384,7 @@ static if (!EPOLL)
sigarr[i] = gs_signalQueue[*g_threadId][idx];
i++;
}
static if (LOG) tracef("Got %d signals", i);
}

synchronized (gs_queueMutex.writer) {
Expand All @@ -3355,6 +3402,7 @@ static if (!EPOLL)
nothrow void addSignal(shared AsyncSignal ctxt) {
try {
size_t thread_id = ctxt.threadId;
static if (LOG) tracef("Adding signal to thread %d", thread_id);
bool must_resize;
import std.stdio;
synchronized (gs_queueMutex.writer) {
Expand Down
2 changes: 1 addition & 1 deletion source/libasync/posix2.d
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ mixin template RunKill()
foreach (wd; remove_list[]) {
unwatch(ctxt.fd, wd); // deletes all related m_dwFolders and m_dwFiles entries
}

ThreadMem.free(m_watchers[ctxt.fd]);
ThreadMem.free(m_changes[ctxt.fd]);
m_watchers.remove(ctxt.fd);
m_changes.remove(ctxt.fd);
Expand Down
1 change: 1 addition & 0 deletions source/libasync/test.d
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void testDirectoryWatcher() {
}
});
g_watcher.watchDir(cache_path);

AsyncTimer tm = new AsyncTimer(g_evl);
tm.duration(1.seconds).run({
writeln("Creating directory ./hey");
Expand Down

0 comments on commit 7d37cbf

Please sign in to comment.