From 0170b7dc6ac445e5dc0f1f26574731ae4e535279 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Wed, 8 Jan 2025 00:44:16 +0100 Subject: [PATCH] Specialize remotecall_pool(remotecall) to wait for the remotecall Otherwise the worker would prematurely be put back into the pool, causing oversubscription. Also added a warning about oversubscription to the docstring for `remote_do(f, ::AbstractWorkerPool)`. --- docs/src/_changelog.md | 4 ++++ src/workerpool.jl | 26 ++++++++++++++++++++++++++ test/distributed_exec.jl | 13 +++++++++++++ 3 files changed, 43 insertions(+) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 5d9207f..3ef0590 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -11,6 +11,10 @@ This documents notable changes in DistributedNext.jl. The format is based on ### Fixed - Fixed a cause of potential hangs when exiting the process ([#16]). +- Fixed a subtle bug in `remotecall(f, ::AbstractWorkerPool)`, previously the + implementation would take a worker out of the pool and immediately put it back + in without waiting for the returned [`Future`](@ref). Now it will wait for the + `Future` before putting the worker back in the pool ([#20]). ### Added - A watcher mechanism has been added to detect when both the Distributed stdlib diff --git a/src/workerpool.jl b/src/workerpool.jl index 92b02ff..6d03bc9 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -135,6 +135,28 @@ function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...) end end +# Specialization for remotecall. We have to wait for the Future it returns +# before putting the worker back in the pool. +function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, args...; kwargs...) + worker = take!(pool) + local x + try + x = rc_f(f, worker, args...; kwargs...) + catch + put!(pool, worker) + rethrow() + end + + t = Threads.@spawn try + wait(x) + finally + put!(pool, worker) + end + errormonitor(t) + + return x +end + # Check if pool is local or remote and forward calls if required. # NOTE: remotecall_fetch does it automatically, but this will be more efficient as # it avoids the overhead associated with a local remotecall. @@ -242,6 +264,10 @@ remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_p [`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and perform a `remote_do` on it. + +Note that it's not possible to wait for the result of a `remote_do()` to finish +so the worker will immediately be put back in the pool (i.e. potentially causing +oversubscription). """ remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 8bfc462..23fba89 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1014,6 +1014,19 @@ f16091b = () -> 1 @test_throws RemoteException fetch(ref) end + # Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should + # keep the worker out of the pool until the underlying remotecall has + # finished. + remotechan = RemoteChannel(wrkr1) + pool = WorkerPool([wrkr1]) + put_future = remotecall(() -> wait(remotechan), pool) + @test !isready(pool) + put!(remotechan, 1) + wait(put_future) + # The task that waits on the future to put it back into the pool runs + # asynchronously so we use timedwait() to check when the worker is back in. + @test timedwait(() -> isready(pool), 10) === :ok + # Test calling @everywhere from a module not defined on the workers LocalBar.bar() for p in procs()