Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print a completion message when all tests are done #192

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
4 changes: 1 addition & 3 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
name = "ReTestItems"
uuid = "817f1d60-ba6b-4fd5-9520-3cf149f6a823"
version = "1.29.0"
version = "1.30.0"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Expand All @@ -15,7 +14,6 @@ TestEnv = "1e6cf692-eddd-4d53-88a5-2d735e33781b"
[compat]
Dates = "1"
Logging = "1"
LoggingExtras = "1"
Pkg = "1"
Profile = "1"
Random = "1"
Expand Down
35 changes: 21 additions & 14 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ using Test: Test, DefaultTestSet, TestSetException
using .Threads: @spawn, nthreads
using Pkg: Pkg
using TestEnv
using Logging
using LoggingExtras
using Logging: current_logger, with_logger

export runtests, runtestitem
export @testsetup, @testitem
Expand Down Expand Up @@ -66,6 +65,7 @@ function softscope_all!(@nospecialize ex)
end
end

include("debug.jl")
include("workers.jl")
using .Workers
include("macros.jl")
Expand Down Expand Up @@ -304,7 +304,7 @@ function runtests(
cfg = _Config(; nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, testitem_failfast, failfast, retries, logs, report, verbose_results, timeout_profile_wait, memory_threshold, gc_between_testitems)
debuglvl = Int(debug)
if debuglvl > 0
LoggingExtras.withlevel(LoggingExtras.Debug; verbosity=debuglvl) do
withdebug(debuglvl) do
_runtests(ti_filter, paths′, cfg)
end
else
Expand Down Expand Up @@ -443,27 +443,30 @@ function _runtests_in_current_env(
ti = starting[i]
@spawn begin
with_logger(original_logger) do
manage_worker($w, $proj_name, $testitems, $ti, $cfg)
manage_worker($w, $proj_name, $testitems, $ti, $cfg; worker_num=$i)
end
end
end
end
Test.TESTSET_PRINT_ENABLE[] = true # reenable printing so our `finish` prints
# Let users know if tests are done, and if all of them ran (or if we failed fast).
# Print this above the final report as there might have been other logs printed
# since a failfast-cancellation was printed, but print it ASAP after tests finish
# in case any of the recording/reporting steps have an issue.
print_completion_summary(testitems; failedfast=(cfg.failfast && is_cancelled(testitems)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so @Drvi this would be maybe too late already, since we hang before shutting down the workers?

Maybe we actually just want a log in manage_worker, here?:

close(worker)

We could basically say

@info "Finished all tests. Closing Worker $workerid."

?

Copy link
Collaborator Author

@nickrobinson251 nickrobinson251 Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I just added a log for each worker, so they don't have to coordinate to know if they're the last worker to complete, and also because i suppose any of them could be the one to hang here.

record_results!(testitems)
cfg.report && write_junit_file(proj_name, dirname(projectfile), testitems.graph.junit)
if cfg.failfast && is_cancelled(testitems)
# Let users know if not all tests ran. Print this just above the final report as
# there might have been other logs printed since the cancellation was printed.
print_failfast_summary(testitems)
end
@debugv 1 "Calling Test.finish(testitems)"
Test.finish(testitems) # print summary of total passes/failures/errors
finally
Test.TESTSET_PRINT_ENABLE[] = true
# Cleanup test setup logs
@debugv 1 "Cleaning up test setup logs"
foreach(Iterators.filter(endswith(".log"), readdir(RETESTITEMS_TEMP_FOLDER[], join=true))) do logfile
rm(logfile; force=true) # `force` to ignore error if file already cleaned up
end
@debugv 1 "Done cleaning up test setup logs"
end
@debugv 1 "DONE"
return nothing
end

Expand Down Expand Up @@ -572,16 +575,18 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0)
return testitem
end

# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
function manage_worker(
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config,
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config;
worker_num::Int
)
ntestitems = length(testitems.testitems)
run_number = 1
memory_threshold_percent = 100 * cfg.memory_threshold
while testitem !== nothing
ch = Channel{TestItemResult}(1)
if memory_percent() > memory_threshold_percent
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory."
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting process for worker $worker_num to try to free memory."
terminate!(worker)
wait(worker)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
Expand Down Expand Up @@ -637,7 +642,7 @@ function manage_worker(
close(timer)
end
catch e
@debugv 2 "Error" exception=e
@debugv 2 "Error: $e"
# Handle the exception
if e isa TimeoutException
if cfg.timeout_profile_wait > 0
Expand Down Expand Up @@ -679,7 +684,7 @@ function manage_worker(
run_number = 1
else
run_number += 1
@info "Retrying $(repr(testitem.name)) on a new worker process. Run=$run_number."
@info "Retrying $(repr(testitem.name)) on a new worker $worker_num process. Run=$run_number."
end
# The worker was terminated, so replace it unless there are no more testitems to run
if testitem !== nothing
Expand All @@ -689,7 +694,9 @@ function manage_worker(
continue
end
end
@info "All tests on worker $worker_num completed. Closing $worker."
close(worker)
@debugv 1 "Worker $worker_num closed: $(worker)"
return nothing
end

Expand Down
36 changes: 36 additions & 0 deletions src/debug.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
DEBUG_LEVEL::Int = 0

function setdebug!(level::Int)
global DEBUG_LEVEL = level
return nothing
end

"""
withdebug(level::Int) do
func()
end
"""
function withdebug(f, level::Int)
old = DEBUG_LEVEL
try
setdebug!(level)
f()
finally
setdebug!(old)
end
end

"""
@debugv 1 "msg"
"""
macro debugv(level::Int, messsage)
quote
if DEBUG_LEVEL >= $level
_full_file = $String($(QuoteNode(__source__.file)))
_file = $last($splitdir(_full_file))
_line = $(QuoteNode(__source__.line))
msg = $(esc(messsage))
$print("DEBUG @ $(_file):$(_line) | $msg\n")
end
end
end
4 changes: 4 additions & 0 deletions src/junit_xml.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ function write_junit_file(path::AbstractString, junit::Union{JUnitTestSuites,JUn
open(path, "w") do io
write_junit_file(io, junit)
end
@debugv 1 "Done writing JUnit XML file to $(repr(path))"
return nothing
end

Expand All @@ -201,6 +202,7 @@ function write_junit_file(io::IO, junit::Union{JUnitTestSuites,JUnitTestSuite})
end

function write_junit_xml(io, junit::JUnitTestSuites)
@debugv 2 "Writing JUnit XML for testsuites $(junit.name)"
write(io, "\n<testsuites")
write_counts(io, junit.counts)
write(io, ">")
Expand All @@ -212,6 +214,7 @@ function write_junit_xml(io, junit::JUnitTestSuites)
end

function write_junit_xml(io, ts::JUnitTestSuite)
@debugv 2 "Writing JUnit XML for testsuite $(ts.name)"
write(io, "\n<testsuite name=", xml_markup(ts.name))
write_counts(io, ts.counts)
write(io, ">")
Expand Down Expand Up @@ -258,6 +261,7 @@ function write_dd_tags(io, tc::JUnitTestCase)
end

function write_junit_xml(io, tc::JUnitTestCase)
@debugv 2 "Writing JUnit XML for testcase $(tc.name)"
write(io, "\n\t<testcase name=", xml_markup(tc.name))
write_counts(io, tc.counts)
write(io, ">")
Expand Down
8 changes: 6 additions & 2 deletions src/log_capture.jl
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,13 @@ end
# So that the user is warned that not all tests were run.
# We don't use loglock here, because this is only called once on the coordinator after all
# tasks running tests have stopped and we're printing the final test report.
function print_failfast_summary(t::TestItems)
function print_completion_summary(t::TestItems; failedfast::Bool)
io = DEFAULT_STDOUT[]
printstyled(io, "[ Fail Fast: "; bold=true, color=Base.warn_color())
if failedfast
printstyled(io, "[ Fail Fast: "; bold=true, color=Base.warn_color())
else
printstyled(io, "[ Tests Completed: "; bold=true, color=Base.info_color())
end
println(io, "$(t.count)/$(length(t.testitems)) test items were run.")
return nothing
end
Expand Down
5 changes: 4 additions & 1 deletion src/testcontext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ is_cancelled(t::TestItems) = @atomic t.cancelled
###

function record_results!(ti::TestItems)
@debugv 1 "Recording testitem results"
foreach(ti.graph.children) do child
record_results!(ti.graph, child)
end
@debugv 1 "Done recording testitem results"
return ti
end

function record_results!(dir::DirNode, child_dir::DirNode)
Expand Down Expand Up @@ -153,7 +156,7 @@ function get_starting_testitems(ti::TestItems, n)
len = length(ti.testitems)
step = max(1, len / n)
testitems = [ti.testitems[round(Int, i)] for i in 1:step:len]
@debugv 2 "get_starting_testitems" len n allunique(testitems)
@debugv 2 "get_starting_testitems len=$len n=$n allunique=$(allunique(testitems))"
@assert length(testitems) == min(n, len) && allunique(testitems)
for (i, t) in enumerate(testitems)
@atomic t.scheduled_for_evaluation.value = true
Expand Down
18 changes: 10 additions & 8 deletions src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ end
function terminate!(w::Worker, from::Symbol=:manual)
already_terminated = @atomicswap :monotonic w.terminated = true
if !already_terminated
@debug "terminating worker $(w.pid) from $from"
@debug "terminating $(w) from $(from)"
end
wte = WorkerTerminatedException(w)
@lock w.lock begin
Expand Down Expand Up @@ -114,7 +114,7 @@ end
# Called when timeout_profile_wait is non-zero.
function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual)
if !Sys.iswindows()
@debug "sending profile request to worker $(w.pid) from $from"
@debug "sending profile request to $(w) from $(from)"
if Sys.islinux()
kill(w.process, 10) # SIGUSR1
elseif Sys.isbsd()
Expand All @@ -128,13 +128,15 @@ end
# gracefully terminate a worker by sending a shutdown message
# and waiting for the other tasks to perform worker shutdown
function Base.close(w::Worker)
@debug "closing $(w)"
if !w.terminated && isopen(w.socket)
req = Request(Symbol(), :(), rand(UInt64), true)
@lock w.lock begin
serialize(w.socket, req)
flush(w.socket)
end
end
@debug "waiting for $(w) to terminate"
wait(w)
return
end
Expand Down Expand Up @@ -231,7 +233,7 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
end
end
catch e
# @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace())
# @error "Error redirecting $(w) output" exception=(e, catch_backtrace())
terminate!(w, :redirect_worker_output)
e isa EOFError || e isa Base.IOError || rethrow()
finally
Expand All @@ -250,13 +252,13 @@ function process_responses(w::Worker, ev::Threads.Event)
while isopen(w.socket) && !w.terminated
# get the next Response from the worker
r = deserialize(w.socket)
@assert r isa Response "Received invalid response from worker $(w.pid): $(r)"
# println("Received response $(r) from worker $(w.pid)")
@assert r isa Response "Received invalid response from $(w): $(r)"
# println("Received response $(r) from $(w)")
@lock lock begin
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)"
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from $(w)"
# look up the Future for this request
fut = pop!(reqs, r.id)
@assert !isready(fut.value) "Received duplicate response for request $(r.id) from worker $(w.pid)"
@assert !isready(fut.value) "Received duplicate response for request $(r.id) from $(w)"
if r.error !== nothing
# this allows rethrowing the exception from the worker to the caller
close(fut.value, r.error)
Expand All @@ -266,7 +268,7 @@ function process_responses(w::Worker, ev::Threads.Event)
end
end
catch e
# @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace())
# @error "Error processing responses from $(w)" exception=(e, catch_backtrace())
terminate!(w, :process_responses)
e isa EOFError || e isa Base.IOError || rethrow()
end
Expand Down
40 changes: 27 additions & 13 deletions test/integrationtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,24 @@ end

# test we can call runtests manually w/ directory
@testset "manual `runtests(dir)`" begin
results = encased_testset() do
runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl"))
using IOCapture
c = IOCapture.capture() do
encased_testset(() -> runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl")))
end
results = c.value
@test n_passed(results) == 2 # NoDeps has two test files with a test each
@test contains(c.output, "[ Tests Completed: 2/2 test items were run.")
end

@testset "manual `runtests(file)`" begin
# test we can point to a file at the base of the package (not just in `src` or `test`)
results = encased_testset() do
runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl", "toplevel_tests.jl"))
using IOCapture
c = IOCapture.capture() do
encased_testset(() -> runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl", "toplevel_tests.jl")))
end
results = c.value
@test n_passed(results) == 1
@test contains(c.output, "[ Tests Completed: 1/1 test items were run.")
end

@testset "`runtests(path)` auto finds testsetups" begin
Expand Down Expand Up @@ -273,20 +279,28 @@ end
nworkers = 2
@testset "runtests with nworkers = $nworkers" verbose=true begin
@testset "Pkg.test() $pkg" for pkg in TEST_PKGS
results = with_test_package(pkg) do
withenv("RETESTITEMS_NWORKERS" => nworkers) do
Pkg.test()
c = IOCapture.capture() do
with_test_package(pkg) do
withenv("RETESTITEMS_NWORKERS" => nworkers) do
Pkg.test()
end
end
end
results = c.value
@test all_passed(results)
@test contains(c.output, "[ Tests Completed")
end
@testset "Pkg.test() DontPass.jl" begin
results = with_test_package("DontPass.jl") do
withenv("RETESTITEMS_NWORKERS" => 2) do
Pkg.test()
c = IOCapture.capture() do
with_test_package("DontPass.jl") do
withenv("RETESTITEMS_NWORKERS" => 2) do
Pkg.test()
end
end
end
results = c.value
@test length(non_passes(results)) > 0
@test contains(c.output, "[ Tests Completed")
end
end

Expand Down Expand Up @@ -447,9 +461,9 @@ end
@test !contains(c.output, "tests done")
end
if debug
@test contains(c.output, "Debug:")
@test contains(c.output, "DEBUG @")
else
@test !contains(c.output, "Debug:")
@test !contains(c.output, "DEBUG @")
end
# Test we have the expected summary table
testset = c.value
Expand Down Expand Up @@ -1198,7 +1212,7 @@ end
# monkey-patch the internal `memory_percent` function to return a fixed value, so we
# can control if we hit the `memory_threshold`.
@eval ReTestItems.memory_percent() = 83.1
expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting worker process to try to free memory."
expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting process for worker 1 to try to free memory."

# Pass `memory_threshold` keyword, and hit the memory threshold.
c1 = IOCapture.capture() do
Expand Down
Loading