Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor thread #17

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/core_ext/event_loop.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
abstract class Crystal::EventLoop
@[AlwaysInline]
def self.current : self
# ExecutionContext.current.event_loop
ExecutionContext::Scheduler.current.event_loop
end
end
56 changes: 55 additions & 1 deletion src/core_ext/fiber.cr
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
class Fiber
enum Status
Suspended
Running
Dead
end

def self.current : Fiber
Thread.current.current_fiber
end

def self.suspend : Nil
ExecutionContext.reschedule
end

def self.yield : Nil
Crystal.trace :sched, "yield"
Fiber.current.resume_event.add(0.seconds)
ExecutionContext.reschedule
Fiber.suspend
end

def self.maybe_yield : Nil
if (current_fiber = Fiber.current).should_yield?
Crystal.trace :sched, "yield"
current_fiber.resume_event.add(0.seconds)
Fiber.suspend
end
end

# def self.timeout(timeout : Time::Span?, select_action : Channel::TimeoutAction? = nil) : Nil
Expand Down Expand Up @@ -71,8 +89,44 @@ class Fiber
# # @execution_context = ExecutionContext.current # <= infinite recursion
# end

def status : Status
if @alive
if @context.@resumable == 1
Status::Suspended
else
Status::Running
end
else
Status::Dead
end
end

@should_yield = Atomic(Bool).new(false)

# :nodoc:
#
# returns true if the fiber was already told to yield (but still hasn't)
def should_yield! : Bool?
if status.running?
@should_yield.swap(true, :relaxed)
else
false
end
end

# :nodoc:
def should_yield? : Bool
@should_yield.get(:relaxed)
end

# :nodoc:
def clear_should_yield! : Nil
@should_yield.set(false, :relaxed)
end

def enqueue : Nil
execution_context.enqueue(self)
Fiber.maybe_yield
end

def resume : Nil
Expand Down
14 changes: 8 additions & 6 deletions src/core_ext/stack_pool.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class Fiber::StackPool
@lock = Crystal::SpinLock.new

# OPTIMIZE: collect stacks that haven't been used during the loop interval
# (instead of deallocating half of them arbitrarily).
def collect(count = lazy_size // 2) : Nil
count.times do
break unless stack = @lock.sync { @deque.shift? }
Expand All @@ -9,12 +11,12 @@ class Fiber::StackPool
end

def collect_loop(every = 5.seconds) : Nil
loop do
sleep(every)
collect
rescue ex
Crystal::System.print_exception(ex)
end
# loop do
# sleep(every)
# collect
# rescue ex
# Crystal::System.print_exception(ex)
# end
end

def checkout : {Void*, Void*}
Expand Down
4 changes: 4 additions & 0 deletions src/core_ext/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ class Thread
main_fiber.execution_context = execution_context
end

def self.each(&) : Nil
threads.each { |thread| yield thread }
end

# :nodoc:
def dead_fiber=(@dead_fiber : Fiber) : Fiber
end
Expand Down
9 changes: 9 additions & 0 deletions src/core_ext/thread_linked_list.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Thread
class LinkedList(T)
def each(&) : Nil
@mutex.synchronize do
unsafe_each { |node| yield node }
end
end
end
end
6 changes: 6 additions & 0 deletions src/execution_context.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "./scheduler"
require "./single_threaded"
require "./isolated"
require "./multi_threaded"
require "./monitor"

{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %}

Expand All @@ -21,6 +22,7 @@ module ExecutionContext
{% else %}
@@default = SingleThreaded.default
{% end %}
@@monitor = Monitor.new
end

# Returns the default number of workers to start in the execution context.
Expand All @@ -42,6 +44,10 @@ module ExecutionContext
@@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context })
end

def self.each(&) : Nil
execution_contexts.each { |execution_context| yield execution_context }
end

@[AlwaysInline]
def self.current : ExecutionContext
Thread.current.execution_context
Expand Down
122 changes: 122 additions & 0 deletions src/monitor.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
module ExecutionContext
# OPTIMIZE: put a low priority on the thread so the OS can schedule it on a
# low power CPU core, rather than an efficient one.
class Monitor
struct Timer
def initialize(@every : Time::Span)
@last = Time.monotonic
end

def elapsed?(now)
ret = @last + @every <= now
@last = now if ret
ret
end
end

DEFAULT_EVERY = 10.milliseconds
DEFAULT_COLLECT_STACKS_EVERY = 5.seconds

def initialize(
@every = DEFAULT_EVERY,
collect_stacks_every = DEFAULT_COLLECT_STACKS_EVERY
)
@collect_stacks_timer = Timer.new(collect_stacks_every)
@thread = uninitialized Thread
@thread = Thread.new { run_loop }
@running_fibers = {} of Scheduler => {Fiber, Int32}
end

private def run_loop : Nil
every do |now|
mark_long_running_fibers(now)
# TODO: run each EC event-loop every once in a while
collect_stacks if @collect_stacks_timer.elapsed?(now)
end
end

# Executes the block at exact intervals (depending on the OS scheduler
# precision and overall OS load), without counting the time to execute the
# block.
#
# OPTIMIZE: consider exponential backoff when all schedulers are pending to
# reduce CPU usage
private def every(&)
remaining = @every

loop do
Thread.sleep(remaining)
now = Time.monotonic
yield(now)
remaining = (now + @every - Time.monotonic).clamp(Time::Span.zero..)
rescue exception
Crystal::System.print_error_buffered(
"BUG: %s#run_loop crashed with %s (%s)",
self.class.name,
exception.message,
exception.class.name,
backtrace: exception.backtrace)
end
end

# Iterates each ExecutionContext and collects unused Fiber stacks.
#
# TODO: should maybe happen during GC collections (?)
private def collect_stacks
Crystal.trace :sched, "collect_stacks" do
ExecutionContext.each(&.stack_pool.collect)
end
end

# Iterates each ExecutionContext::Scheduler and checks for how long the
# current fiber has been running, and tells those that were already running
# during the previous iteration to yield.
#
# Skips `Isolated` contexts where concurrency is disabled. Their fiber is
# expected to run for as long as needed.
#
# At best, a fiber may be noticed right when it started, then found again on
# the next iteration, so it will be asked to yield after running for ~10ms.
#
# At worst, a fiber may start right after an iteration, thus be noticed on
# the next iteration, then asked to yield on the next one, so it will be
# asked to yield after running for ~20ms.
#
# NOTE: fibers are still cooperative, which means they will only yield when
# reaching a yielding point, if it ever reaches such a point.
private def mark_long_running_fibers(now)
Thread.each do |thread|
case scheduler = thread.current_scheduler?
when MultiThreaded
tick = scheduler.@tick
when SingleThreaded
tick = scheduler.@tick
else
next
end

# never ask a scheduler's main fiber to yield (it musn't)
running_fiber = thread.current_fiber
next if running_fiber == scheduler.@main_fiber

# using tick to avoid ABA problem: if it changed then the scheduler
# yielded, and just happens to be running the same fiber (don't
# interrupt it)
#
# FIXME: not atomic, tick & current fiber may be out of sync (consider
# DWCAS? TaggedPointer?)
if @running_fibers[scheduler]? == {running_fiber, tick}
# the same fiber has been running continuously since the previous
# loop iteration: tell it to yield

if running_fiber.should_yield!
# TODO: complain that the fiber has already been told to yield (but
# still hasn't)
end
end

@running_fibers[scheduler] = {running_fiber, tick}
end
end
end
end
4 changes: 4 additions & 0 deletions src/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ module ExecutionContext
# well as any other local or instance variables (e.g. we must resolve
# `Thread.current` again)

# that being said, we can still trust the `current_fiber` local variable
# (it's the only exception)
current_fiber.clear_should_yield!

{% unless flag?(:interpreted) %}
if fiber = Thread.current.dead_fiber?
fiber.execution_context.stack_pool.release(fiber.@stack)
Expand Down
Loading