From 3e14951793a5418f916741b9b29a3b60cacaa5a4 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Wed, 4 Dec 2024 12:35:56 +0100 Subject: [PATCH 01/11] Skeleton for ExecutionContext types as per RFC #0002 - Add the `ExecutionContext` module; - Add the `ExecutionContext::Scheduler` module; - Add the `execution_context` compile-time flag. When the `execution_context` flag is set: - Don't load `Crystal::Scheduler`; - Plug `ExecutionContext` instead of `Crystal::Scheduler` in `spawn`, `Fiber`, ... This is only the skeleton: there are no implementations (yet). Trying to compile anything with `-Dexecution_context` will obviously fail for the time being. --- src/concurrent.cr | 41 ++++++--- src/crystal/event_loop.cr | 14 ++- src/crystal/event_loop/polling.cr | 18 +++- src/crystal/scheduler.cr | 3 +- src/crystal/system/thread.cr | 41 +++++++-- src/crystal/system/unix/signal.cr | 1 + src/crystal/tracing.cr | 10 +++ src/execution_context/execution_context.cr | 100 +++++++++++++++++++++ src/execution_context/scheduler.cr | 83 +++++++++++++++++ src/fiber.cr | 73 +++++++++++++-- src/io/evented.cr | 12 ++- src/kernel.cr | 6 +- 12 files changed, 365 insertions(+), 37 deletions(-) create mode 100644 src/execution_context/execution_context.cr create mode 100644 src/execution_context/scheduler.cr diff --git a/src/concurrent.cr b/src/concurrent.cr index 07ae945a84f6..1f1ad04bfd06 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -1,8 +1,13 @@ require "fiber" require "channel" -require "crystal/scheduler" require "crystal/tracing" +{% if flag?(:execution_context) %} + require "execution_context" +{% else %} + require "crystal/scheduler" +{% end %} + # Blocks the current fiber for the specified number of seconds. # # While this fiber is waiting this time, other ready-to-execute @@ -12,8 +17,7 @@ def sleep(seconds : Number) : Nil if seconds < 0 raise ArgumentError.new "Sleep seconds must be positive" end - - Crystal::Scheduler.sleep(seconds.seconds) + sleep(seconds.seconds) end # Blocks the current Fiber for the specified time span. @@ -21,16 +25,28 @@ end # While this fiber is waiting this time, other ready-to-execute # fibers might start their execution. def sleep(time : Time::Span) : Nil - Crystal::Scheduler.sleep(time) + Crystal.trace :sched, "sleep", for: time + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(time) + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.sleep(time) + {% end %} end # Blocks the current fiber forever. # # Meanwhile, other ready-to-execute fibers might start their execution. def sleep : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end +{% begin %} # Spawns a new fiber. # # NOTE: The newly created fiber doesn't run as soon as spawned. @@ -64,12 +80,17 @@ end # wg.wait # ``` def spawn(*, name : String? = nil, same_thread = false, &block) - fiber = Fiber.new(name, &block) - Crystal.trace :sched, "spawn", fiber: fiber - {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} - fiber.enqueue - fiber + {% if flag?(:execution_context) %} + ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) + {% else %} + fiber = Fiber.new(name, &block) + Crystal.trace :sched, "spawn", fiber: fiber + {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} + fiber.enqueue + fiber + {% end %} end +{% end %} # Spawns a fiber by first creating a `Proc`, passing the *call*'s # expressions to it, and letting the `Proc` finally invoke the *call*. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 00bcb86040b6..0294abc2ef3d 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current : self - Crystal::Scheduler.event_loop + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop + {% end %} end @[AlwaysInline] - def self.current? : self? - Crystal::Scheduler.event_loop? + def self.current? : self | Nil + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop? + {% end %} end # Runs the loop. diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 2fe86ad5b649..f7fc36082a0e 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -115,7 +115,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # NOTE: thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end true end @@ -303,13 +307,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop Polling.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end pd.value.@writers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index efee6b3c06f1..6cc13406ea4a 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,3 +1,5 @@ +{% skip_file if flag?(:execution_context) %} + require "crystal/event_loop" require "crystal/system/print_error" require "fiber" @@ -66,7 +68,6 @@ class Crystal::Scheduler end def self.sleep(time : Time::Span) : Nil - Crystal.trace :sched, "sleep", for: time Thread.current.scheduler.sleep(time) end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 92136d1f3989..2b5e06498798 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -68,6 +68,39 @@ class Thread getter name : String? + {% if flag?(:execution_context) %} + # :nodoc: + getter! execution_context : ExecutionContext + + # :nodoc: + property! current_scheduler : ExecutionContext::Scheduler + + # :nodoc: + def execution_context=(@execution_context : ExecutionContext) : ExecutionContext + main_fiber.execution_context = execution_context + end + + # :nodoc: + def dead_fiber=(@dead_fiber : Fiber) : Fiber + end + + # :nodoc: + def dead_fiber? : Fiber? + if fiber = @dead_fiber + @dead_fiber = nil + fiber + end + end + {% else %} + # :nodoc: + getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } + + # :nodoc: + def scheduler? : ::Crystal::Scheduler? + @scheduler + end + {% end %} + def self.unsafe_each(&) # nothing to iterate when @@threads is nil + don't lazily allocate in a # method called from a GC collection callback! @@ -154,14 +187,6 @@ class Thread thread.name = name end - # :nodoc: - getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } - - # :nodoc: - def scheduler? : ::Crystal::Scheduler? - @scheduler - end - protected def start Thread.threads.push(self) Thread.current = self diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index 12804ea00267..f3c28b6f5a88 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -2,6 +2,7 @@ require "c/signal" require "c/stdio" require "c/sys/wait" require "c/unistd" +require "../print_error" module Crystal::System::Signal # The number of libc functions that can be called safely from a signal(2) diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index d9508eda85a8..a6c1f747625f 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -81,6 +81,16 @@ module Crystal write value.name || '?' end + {% if flag?(:execution_context) %} + def write(value : ExecutionContext) : Nil + write value.name + end + + def write(value : ExecutionContext::Scheduler) : Nil + write value.name + end + {% end %} + def write(value : Pointer) : Nil write "0x" System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) } diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr new file mode 100644 index 000000000000..4342945e3812 --- /dev/null +++ b/src/execution_context/execution_context.cr @@ -0,0 +1,100 @@ +require "../crystal/event_loop" +require "../crystal/system/thread" +require "../crystal/system/thread_linked_list" +require "../fiber" +require "../fiber/stack_pool" +require "./scheduler" + +{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} + +module ExecutionContext + @@default : ExecutionContext? + + @[AlwaysInline] + def self.default : ExecutionContext + @@default.not_nil!("expected default execution context to have been setup") + end + + # :nodoc: + def self.init_default_context : Nil + raise NotImplementedError.new("No execution context implementations (yet)") + end + + # Returns the default number of workers to start in the execution context. + def self.default_workers_count : Int32 + ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32) + end + + # :nodoc: + protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new } + + # :nodoc: + property next : ExecutionContext? + + # :nodoc: + property previous : ExecutionContext? + + # :nodoc: + def self.unsafe_each(&) : Nil + @@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context }) + end + + def self.each(&) : Nil + execution_contexts.each { |execution_context| yield execution_context } + end + + @[AlwaysInline] + def self.current : ExecutionContext + Thread.current.execution_context + end + + # Tells the current scheduler to suspend the current fiber and resume the + # next runnable fiber. The current fiber will never be resumed; you're + # responsible to reenqueue it. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + @[AlwaysInline] + def self.reschedule : Nil + Scheduler.current.reschedule + end + + # Tells the current scheduler to suspend the current fiber and to resume + # *fiber* instead. The current fiber will never be resumed; you're responsible + # to reenqueue it. + # + # Raises `RuntimeError` if the fiber doesn't belong to the current execution + # context. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + def self.resume(fiber : Fiber) : Nil + if fiber.execution_context == current + Scheduler.current.resume(fiber) + else + raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current}") + end + end + + # Creates a new fiber then calls `#enqueue` to add it to the execution + # context. + # + # May be called from any `ExecutionContext` (i.e. must be thread-safe). + def spawn(*, name : String? = nil, &block : ->) : Fiber + Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) } + end + + # Legacy support for the `same_thread` argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + abstract def stack_pool : Fiber::StackPool + abstract def stack_pool? : Fiber::StackPool? + + abstract def event_loop : Crystal::EventLoop + + # Enqueues a fiber to be resumed inside the execution context. + # + # May be called from any ExecutionContext (i.e. must be thread-safe). + abstract def enqueue(fiber : Fiber) : Nil +end diff --git a/src/execution_context/scheduler.cr b/src/execution_context/scheduler.cr new file mode 100644 index 000000000000..fe5acab96500 --- /dev/null +++ b/src/execution_context/scheduler.cr @@ -0,0 +1,83 @@ +module ExecutionContext + module Scheduler + @[AlwaysInline] + def self.current : Scheduler + Thread.current.current_scheduler + end + + protected abstract def thread : Thread + protected abstract def execution_context : ExecutionContext + + # Instantiates a fiber and enqueues it into the scheduler's local queue. + def spawn(*, name : String? = nil, &block : ->) : Fiber + fiber = Fiber.new(name, execution_context, &block) + enqueue(fiber) + fiber + end + + # Legacy support for the *same_thread* argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def enqueue(fiber : Fiber) : Nil + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def reschedule : Nil + + # Suspends the execution of the current fiber and resumes *fiber*. + # + # The current fiber will never be resumed; you're responsible to reenqueue + # it. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.resume` instead. + protected abstract def resume(fiber : Fiber) : Nil + + # Switches the thread from running the current fiber to run *fiber* instead. + # + # Handles thread safety around fiber stacks: locks the GC to not start a + # collection while we're switching context, releases the stack of a dead + # fiber. + # + # Unsafe. Must only be called by the current scheduler. Caller must ensure + # that the fiber indeed belongs to the current execution context, and that + # the fiber can indeed be resumed. + protected def swapcontext(fiber : Fiber) : Nil + current_fiber = thread.current_fiber + + {% unless flag?(:interpreted) %} + thread.dead_fiber = current_fiber if current_fiber.dead? + {% end %} + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + + # we switched context so we can't trust *self* anymore (it is the + # scheduler that rescheduled *fiber* which may be another scheduler) as + # well as any other local or instance variables (e.g. we must resolve + # `Thread.current` again) + # + # that being said, we can still trust the *current_fiber* local variable + # (it's the only exception) + + {% unless flag?(:interpreted) %} + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + end + + abstract def status : String + end +end diff --git a/src/fiber.cr b/src/fiber.cr index b34a8762037d..39a9f2bf2b85 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -59,7 +59,10 @@ class Fiber property name : String? @alive = true - {% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread = Atomic(Thread?).new(nil) + {% end %} # :nodoc: property next : Fiber? @@ -67,6 +70,10 @@ class Fiber # :nodoc: property previous : Fiber? + {% if flag?(:execution_context) %} + property! execution_context : ExecutionContext + {% end %} + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) @@ -84,16 +91,19 @@ class Fiber fibers.each { |fiber| yield fiber } end + {% begin %} # Creates a new `Fiber` instance. # # When the fiber is executed, it runs *proc* in its context. # # *name* is an optional and used only as an internal reference. - def initialize(@name : String? = nil, &@proc : ->) + def initialize(@name : String? = nil, {% if flag?(:execution_context) %}@execution_context : ExecutionContext = ExecutionContext.current,{% end %} &@proc : ->) @context = Context.new @stack, @stack_bottom = {% if flag?(:interpreted) %} {Pointer(Void).null, Pointer(Void).null} + {% elsif flag?(:execution_context) %} + execution_context.stack_pool.checkout {% else %} Crystal::Scheduler.stack_pool.checkout {% end %} @@ -123,6 +133,7 @@ class Fiber Fiber.fibers.push(self) end + {% end %} # :nodoc: def initialize(@stack : Void*, thread) @@ -139,13 +150,30 @@ class Fiber {% end %} thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom @name = "main" - {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread.set(thread) + {% end %} + Fiber.fibers.push(self) + + # we don't initialize @execution_context here (we may not have an execution + # context yet), and we can't detect ExecutionContext.current (we may reach + # an infinite recursion). end # :nodoc: def run GC.unlock_read + + {% if flag?(:execution_context) && !flag?(:interpreted) %} + # if the fiber previously running on this thread has terminated, we can + # now safely release its stack + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + @proc.call rescue ex if name = @name @@ -163,9 +191,17 @@ class Fiber @timeout_select_action = nil @alive = false - {% unless flag?(:interpreted) %} + + {% unless flag?(:interpreted) || flag?(:execution_context) %} + # interpreted: the interpreter is managing the stacks + # + # execution context: do not prematurely release the stack before we switch + # to another fiber so we don't end up with a thread reusing a stack for a + # new fiber while the current fiber isn't fully terminated (oops); even + # without the pool, we can't unmap before we swap context. Crystal::Scheduler.stack_pool.release(@stack) {% end %} + Fiber.suspend end @@ -207,7 +243,11 @@ class Fiber # puts "never reached" # ``` def resume : Nil - Crystal::Scheduler.resume(self) + {% if flag?(:execution_context) %} + ExecutionContext.resume(self) + {% else %} + Crystal::Scheduler.resume(self) + {% end %} end # Adds this fiber to the scheduler's runnables queue for the current thread. @@ -216,7 +256,11 @@ class Fiber # the next time it has the opportunity to reschedule to another fiber. There # are no guarantees when that will happen. def enqueue : Nil - Crystal::Scheduler.enqueue(self) + {% if flag?(:execution_context) %} + execution_context.enqueue(self) + {% else %} + Crystal::Scheduler.enqueue(self) + {% end %} end # :nodoc: @@ -284,7 +328,14 @@ class Fiber # end # ``` def self.yield : Nil - Crystal::Scheduler.yield + Crystal.trace :sched, "yield" + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(0.seconds) + Fiber.suspend + {% else %} + Crystal::Scheduler.yield + {% end %} end # Suspends execution of the current fiber indefinitely. @@ -298,7 +349,11 @@ class Fiber # useful if the fiber needs to wait for something to happen (for example an IO # event, a message is ready in a channel, etc.) which triggers a re-enqueue. def self.suspend : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end def to_s(io : IO) : Nil @@ -320,7 +375,7 @@ class Fiber GC.push_stack @context.stack_top, @stack_bottom end - {% if flag?(:preview_mt) %} + {% if flag?(:preview_mt) && !flag?(:execution_context) %} # :nodoc: def set_current_thread(thread = Thread.current) : Thread @current_thread.set(thread) diff --git a/src/io/evented.cr b/src/io/evented.cr index 1f95d1870b0b..b238830f284a 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -89,11 +89,19 @@ module IO::Evented @write_event.consume_each &.free @readers.consume_each do |readers| - Crystal::Scheduler.enqueue readers + {% if flag?(:execution_context) %} + readers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue readers + {% end %} end @writers.consume_each do |writers| - Crystal::Scheduler.enqueue writers + {% if flag?(:execution_context) %} + writers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue writers + {% end %} end end diff --git a/src/kernel.cr b/src/kernel.cr index 34763b994839..c2af8771824e 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -608,7 +608,11 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - Crystal::Scheduler.init + {% if flag?(:execution_context) %} + ExecutionContext.init_default_context + {% else %} + Crystal::Scheduler.init + {% end %} {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop From 35da3f926ba1ee43ea89470293bd47a85ca7cf05 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 5 Dec 2024 16:46:39 +0100 Subject: [PATCH 02/11] Add thread safety to Fiber::StackPool The current ST and MT schedulers use a distinct pool per thread, which means we only need the thread safety for execution contexts that will share a single pool for a whole context. --- src/fiber/stack_pool.cr | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 8f809335f46c..062785b61313 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -13,6 +13,10 @@ class Fiber # pointer value) rather than downwards, so *protect* must be false. def initialize(@protect : Bool = true) @deque = Deque(Void*).new + + {% if flag?(:execution_context) %} + @lock = Crystal::SpinLock.new + {% end %} end def finalize @@ -25,7 +29,7 @@ class Fiber # returning memory to the operating system. def collect(count = lazy_size // 2) : Nil count.times do - if stack = @deque.shift? + if stack = shift? Crystal::System::Fiber.free_stack(stack, STACK_SIZE) else return @@ -42,7 +46,7 @@ class Fiber # Removes a stack from the bottom of the pool, or allocates a new one. def checkout : {Void*, Void*} - if stack = @deque.pop? + if stack = pop? Crystal::System::Fiber.reset_stack(stack, STACK_SIZE, @protect) else stack = Crystal::System::Fiber.allocate_stack(STACK_SIZE, @protect) @@ -52,7 +56,11 @@ class Fiber # Appends a stack to the bottom of the pool. def release(stack) : Nil - @deque.push(stack) + {% if flag?(:execution_context) %} + @lock.sync { @deque.push(stack) } + {% else %} + @deque.push(stack) + {% end %} end # Returns the approximated size of the pool. It may be equal or slightly @@ -60,5 +68,21 @@ class Fiber def lazy_size : Int32 @deque.size end + + private def shift? + {% if flag?(:execution_context) %} + @lock.sync { @deque.shift? } unless @deque.empty? + {% else %} + @deque.shift? + {% end %} + end + + private def pop? + {% if flag?(:execution_context) %} + @lock.sync { @deque.pop? } unless @deque.empty? + {% else %} + @deque.pop? + {% end %} + end end end From 3852ce4b242167e5585deea48e07b8f1cc1f19a6 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 6 Dec 2024 15:01:08 +0100 Subject: [PATCH 03/11] Alt EventLoop#run(queue*, blocking) method The point is to avoid parallel enqueues while running the event loop, so we get better control to where and how the runnable fibers are enqueued; for example all at once instead of one by one (may not be as effective as it sounds). More importantly for Execution Contexts: it avoids parallel enqueues while the eventloop is running which sometimes leads to confusing behavior; for example when deciding to wake up a scheduler/thread we musn't interryupt the event loop (obviously). This is working correctly for the Polling (Epoll, Kqueue) and IOCP event loop implementations. I'm less confident with the libevent one where the external library executes arbitrary callbacks. --- src/crystal/event_loop.cr | 7 +++++ src/crystal/event_loop/iocp.cr | 8 ++++++ src/crystal/event_loop/libevent.cr | 43 +++++++++++++++++++++++++----- src/crystal/event_loop/polling.cr | 9 ++++++- src/io/evented.cr | 14 ++++++++-- 5 files changed, 71 insertions(+), 10 deletions(-) diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 0294abc2ef3d..1b2887a1c535 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -54,6 +54,13 @@ abstract class Crystal::EventLoop # events. abstract def run(blocking : Bool) : Bool + {% if flag?(:execution_context) %} + # Same as `#run` but collects runnable fibers into *queue* instead of + # enqueueing in parallel, so the caller is responsible and in control for + # when and how the fibers will be enqueued. + abstract def run(queue : Fiber::Queue*, blocking : Bool) : Nil + {% end %} + # Tells a blocking run loop to no longer wait for events to activate. It may # for example enqueue a NOOP event with an immediate (or past) timeout. Having # activated an event, the loop shall return, allowing the blocked thread to diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 6e4175e3daee..3c84b2ce927d 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop iocp end + # thread unsafe def run(blocking : Bool) : Bool enqueued = false @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop enqueued end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + run_impl(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. private def run_impl(blocking : Bool, &) : Nil diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 9c0b3d33b15c..b6a2f1baa745 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop(flags) end + {% if flag?(:execution_context) %} + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking + @runnables = queue + run(blocking) + ensure + @runnables = nil + end + + def callback_enqueue(fiber : Fiber) : Nil + if queue = @runnables + queue.value.push(fiber) + else + raise "BUG: libevent callback executed outside of #run(queue*, blocking) call" + end + end + {% end %} + def interrupt : Nil event_base.loop_exit end - # Create a new resume event for a fiber. + # Create a new resume event for a fiber (sleep). def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| - data.as(Fiber).enqueue + f = data.as(Fiber) + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} end end - # Creates a timeout_event. + # Creates a timeout event (timeout action of select expression). def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| f = data.as(Fiber) - if (select_action = f.timeout_select_action) + if select_action = f.timeout_select_action f.timeout_select_action = nil - select_action.time_expired(f) - else - f.enqueue + if select_action.time_expired? + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end end end end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index f7fc36082a0e..24d94af02bcc 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -112,7 +112,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end {% end %} - # NOTE: thread unsafe + # thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| {% if flag?(:execution_context) %} @@ -124,6 +124,13 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop true end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + system_run(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # fiber interface, see Crystal::EventLoop def create_resume_event(fiber : Fiber) : FiberEvent diff --git a/src/io/evented.cr b/src/io/evented.cr index b238830f284a..db601a83964f 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -20,7 +20,12 @@ module IO::Evented @read_timed_out = timed_out if reader = @readers.get?.try &.shift? - reader.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + reader.enqueue + {% end %} end end @@ -29,7 +34,12 @@ module IO::Evented @write_timed_out = timed_out if writer = @writers.get?.try &.shift? - writer.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + writer.enqueue + {% end %} end end From 06836950d1f3e128c5988fc75051de0bafadfb05 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 17 Jan 2025 14:09:00 +0100 Subject: [PATCH 04/11] Fix: wrong documentation for ExecutionContext#enqueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Johannes Müller --- src/execution_context/scheduler.cr | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/execution_context/scheduler.cr b/src/execution_context/scheduler.cr index fe5acab96500..aca04a3b05d4 100644 --- a/src/execution_context/scheduler.cr +++ b/src/execution_context/scheduler.cr @@ -19,11 +19,11 @@ module ExecutionContext # decide to support it or not (e.g. a single threaded context can accept it). abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber - # Suspends the execution of the current fiber and resumes the next runnable - # fiber. + # Suspends the current fiber and resumes *fiber* instead. + # The current fiber will never be resumed; you're responsible to reenqueue it. # # Unsafe. Must only be called on `ExecutionContext.current`. Prefer - # `ExecutionContext.reschedule` instead. + # `ExecutionContext.enqueue` instead. protected abstract def enqueue(fiber : Fiber) : Nil # Suspends the execution of the current fiber and resumes the next runnable From 8abce65b7006b68c5f36de310117c40ce1d6224c Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 17 Jan 2025 14:57:57 +0100 Subject: [PATCH 05/11] Document Thread#dead_fiber (delayed fiber stack cleanup) --- src/crystal/system/thread.cr | 13 +++++++++++++ src/execution_context/scheduler.cr | 16 ++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 2b5e06498798..16ddff58253c 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -81,10 +81,23 @@ class Thread end # :nodoc: + # + # When a fiber terminates we can't release its stack until we swap context + # to another fiber: can't free/unmap nor push it to a shared stack pool; + # that would result in a segfault. + # + # Set by `ExecutionContext::Scheduler#swapcontext` before switching context + # from a terminating fiber to another fiber running on the same thread. def dead_fiber=(@dead_fiber : Fiber) : Fiber end # :nodoc: + # + # Checked by `ExecutionContext::Scheduler#swapcontext` after the context + # switch and by `Fiber#run` to release the stack of a dead fiber previously + # running on this thread. + # + # Automatically clears `@dead_fiber`: we only need the information once. def dead_fiber? : Fiber? if fiber = @dead_fiber @dead_fiber = nil diff --git a/src/execution_context/scheduler.cr b/src/execution_context/scheduler.cr index aca04a3b05d4..99305c1c4fb5 100644 --- a/src/execution_context/scheduler.cr +++ b/src/execution_context/scheduler.cr @@ -44,13 +44,17 @@ module ExecutionContext # Switches the thread from running the current fiber to run *fiber* instead. # - # Handles thread safety around fiber stacks: locks the GC to not start a - # collection while we're switching context, releases the stack of a dead - # fiber. + # Handles thread safety around fiber stacks: + # + # 1. locks the GC to not start a collection while we're switching context + # 2. memorizes a dying fiber then releases its stack after switching context + # + # These two operations mean that `Fiber#run` must also release the GC lock + # and check for a dead fiber! # - # Unsafe. Must only be called by the current scheduler. Caller must ensure - # that the fiber indeed belongs to the current execution context, and that - # the fiber can indeed be resumed. + # Unsafe. Must only be called by the current scheduler. The caller must + # ensure that the fiber indeed belongs to the current execution context and + # that the fiber can indeed be resumed. protected def swapcontext(fiber : Fiber) : Nil current_fiber = thread.current_fiber From 7f2d0d48c5dc1eb7e21dc50ef6418d3e84f0bd89 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 17 Jan 2025 15:10:30 +0100 Subject: [PATCH 06/11] Fix: move require crystal/system/print_error to src/raise We need this because we don't load crystal/scheduler anymore when execution contexts are enabled. --- src/crystal/system/unix/signal.cr | 1 - src/raise.cr | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index f3c28b6f5a88..12804ea00267 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -2,7 +2,6 @@ require "c/signal" require "c/stdio" require "c/sys/wait" require "c/unistd" -require "../print_error" module Crystal::System::Signal # The number of libc functions that can be called safely from a signal(2) diff --git a/src/raise.cr b/src/raise.cr index 1ba0243def28..7ebb18f1320e 100644 --- a/src/raise.cr +++ b/src/raise.cr @@ -1,5 +1,6 @@ require "c/stdio" require "c/stdlib" +require "crystal/system/print_error" require "exception/call_stack" Exception::CallStack.skip(__FILE__) From 094393c5a098bb6f29508c73f0cdbdaca1a07372 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 20 Jan 2025 10:59:43 +0100 Subject: [PATCH 07/11] Rename as Fiber::ExecutionContext + add :nodoc: --- src/concurrent.cr | 8 +++---- src/crystal/event_loop.cr | 4 ++-- src/crystal/system/thread.cr | 17 +++++++------- src/crystal/tracing.cr | 4 ++-- .../execution_context.cr | 23 ++++++++++++++++--- .../execution_context/scheduler.cr | 7 ++++-- src/kernel.cr | 2 +- 7 files changed, 43 insertions(+), 22 deletions(-) rename src/{execution_context => fiber}/execution_context.cr (85%) rename src/{ => fiber}/execution_context/scheduler.cr (94%) diff --git a/src/concurrent.cr b/src/concurrent.cr index 1f1ad04bfd06..53cbff1d3c67 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -3,7 +3,7 @@ require "channel" require "crystal/tracing" {% if flag?(:execution_context) %} - require "execution_context" + require "fiber/execution_context" {% else %} require "crystal/scheduler" {% end %} @@ -29,7 +29,7 @@ def sleep(time : Time::Span) : Nil {% if flag?(:execution_context) %} Fiber.current.resume_event.add(time) - ExecutionContext.reschedule + Fiber::ExecutionContext.reschedule {% else %} Crystal::Scheduler.sleep(time) {% end %} @@ -40,7 +40,7 @@ end # Meanwhile, other ready-to-execute fibers might start their execution. def sleep : Nil {% if flag?(:execution_context) %} - ExecutionContext.reschedule + Fiber::ExecutionContext.reschedule {% else %} Crystal::Scheduler.reschedule {% end %} @@ -81,7 +81,7 @@ end # ``` def spawn(*, name : String? = nil, same_thread = false, &block) {% if flag?(:execution_context) %} - ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) + Fiber::ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) {% else %} fiber = Fiber.new(name, &block) Crystal.trace :sched, "spawn", fiber: fiber diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 1b2887a1c535..d16ba621a68e 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -28,7 +28,7 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current : self {% if flag?(:execution_context) %} - ExecutionContext.current.event_loop + Fiber::ExecutionContext.current.event_loop {% else %} Crystal::Scheduler.event_loop {% end %} @@ -37,7 +37,7 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current? : self | Nil {% if flag?(:execution_context) %} - ExecutionContext.current.event_loop + Fiber::ExecutionContext.current.event_loop {% else %} Crystal::Scheduler.event_loop? {% end %} diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 16ddff58253c..56bdd3d646e5 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -70,13 +70,13 @@ class Thread {% if flag?(:execution_context) %} # :nodoc: - getter! execution_context : ExecutionContext + getter! execution_context : Fiber::ExecutionContext # :nodoc: - property! current_scheduler : ExecutionContext::Scheduler + property! scheduler : Fiber::ExecutionContext::Scheduler # :nodoc: - def execution_context=(@execution_context : ExecutionContext) : ExecutionContext + def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext main_fiber.execution_context = execution_context end @@ -86,16 +86,17 @@ class Thread # to another fiber: can't free/unmap nor push it to a shared stack pool; # that would result in a segfault. # - # Set by `ExecutionContext::Scheduler#swapcontext` before switching context - # from a terminating fiber to another fiber running on the same thread. + # Set by `Fiber::ExecutionContext::Scheduler#swapcontext` before switching + # context from a terminating fiber to another fiber running on the same + # thread. def dead_fiber=(@dead_fiber : Fiber) : Fiber end # :nodoc: # - # Checked by `ExecutionContext::Scheduler#swapcontext` after the context - # switch and by `Fiber#run` to release the stack of a dead fiber previously - # running on this thread. + # Checked by `Fiber::ExecutionContext::Scheduler#swapcontext` after the + # context switch and by `Fiber#run` to release the stack of a dead fiber + # previously running on this thread. # # Automatically clears `@dead_fiber`: we only need the information once. def dead_fiber? : Fiber? diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index a6c1f747625f..ab88f841e67f 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -82,11 +82,11 @@ module Crystal end {% if flag?(:execution_context) %} - def write(value : ExecutionContext) : Nil + def write(value : Fiber::ExecutionContext) : Nil write value.name end - def write(value : ExecutionContext::Scheduler) : Nil + def write(value : Fiber::ExecutionContext::Scheduler) : Nil write value.name end {% end %} diff --git a/src/execution_context/execution_context.cr b/src/fiber/execution_context.cr similarity index 85% rename from src/execution_context/execution_context.cr rename to src/fiber/execution_context.cr index 4342945e3812..0f9eb4d7bcd9 100644 --- a/src/execution_context/execution_context.cr +++ b/src/fiber/execution_context.cr @@ -2,14 +2,17 @@ require "../crystal/event_loop" require "../crystal/system/thread" require "../crystal/system/thread_linked_list" require "../fiber" -require "../fiber/stack_pool" -require "./scheduler" +require "./stack_pool" +require "./execution_context/scheduler" {% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} +{% raise "ERROR: execution contexts require the `execution_context` compilation flag" unless flag?(:execution_context) %} -module ExecutionContext +module Fiber::ExecutionContext @@default : ExecutionContext? + # Returns the default `ExecutionContext` for the process, automatically + # started when the program started. @[AlwaysInline] def self.default : ExecutionContext @@default.not_nil!("expected default execution context to have been setup") @@ -39,15 +42,19 @@ module ExecutionContext @@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context }) end + # Iterates all execution contexts. def self.each(&) : Nil execution_contexts.each { |execution_context| yield execution_context } end + # Returns the `ExecutionContext` the current fiber is running in. @[AlwaysInline] def self.current : ExecutionContext Thread.current.execution_context end + # :nodoc: + # # Tells the current scheduler to suspend the current fiber and resume the # next runnable fiber. The current fiber will never be resumed; you're # responsible to reenqueue it. @@ -59,6 +66,8 @@ module ExecutionContext Scheduler.current.reschedule end + # :nodoc: + # # Tells the current scheduler to suspend the current fiber and to resume # *fiber* instead. The current fiber will never be resumed; you're responsible # to reenqueue it. @@ -84,15 +93,23 @@ module ExecutionContext Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) } end + # :nodoc: + # # Legacy support for the `same_thread` argument. Each execution context may # decide to support it or not (e.g. a single threaded context can accept it). abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + # :nodoc: abstract def stack_pool : Fiber::StackPool + + # :nodoc: abstract def stack_pool? : Fiber::StackPool? + # :nodoc: abstract def event_loop : Crystal::EventLoop + # :nodoc: + # # Enqueues a fiber to be resumed inside the execution context. # # May be called from any ExecutionContext (i.e. must be thread-safe). diff --git a/src/execution_context/scheduler.cr b/src/fiber/execution_context/scheduler.cr similarity index 94% rename from src/execution_context/scheduler.cr rename to src/fiber/execution_context/scheduler.cr index 99305c1c4fb5..3a172a8da585 100644 --- a/src/execution_context/scheduler.cr +++ b/src/fiber/execution_context/scheduler.cr @@ -1,8 +1,9 @@ -module ExecutionContext +module Fiber::ExecutionContext + # :nodoc: module Scheduler @[AlwaysInline] def self.current : Scheduler - Thread.current.current_scheduler + Thread.current.scheduler end protected abstract def thread : Thread @@ -82,6 +83,8 @@ module ExecutionContext {% end %} end + # Returns the current status of the scheduler. For example `"running"`, + # `"event-loop"` or `"parked"`. abstract def status : String end end diff --git a/src/kernel.cr b/src/kernel.cr index c2af8771824e..054705c55e5b 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -609,7 +609,7 @@ end Exception::CallStack.setup_crash_handler {% if flag?(:execution_context) %} - ExecutionContext.init_default_context + Fiber::ExecutionContext.init_default_context {% else %} Crystal::Scheduler.init {% end %} From 0e86c551ab593de088701b332555e6af35577039 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 20 Jan 2025 13:59:44 +0100 Subject: [PATCH 08/11] Fix: handle dead fiber stack out of swapcontext --- src/crystal/system/thread.cr | 30 +++++++++------------- src/fiber.cr | 28 +++++++++------------ src/fiber/execution_context/scheduler.cr | 32 ++++++------------------ src/fiber/stack_pool.cr | 6 ++++- 4 files changed, 36 insertions(+), 60 deletions(-) diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 56bdd3d646e5..41f66a57b5dc 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -80,29 +80,23 @@ class Thread main_fiber.execution_context = execution_context end - # :nodoc: - # # When a fiber terminates we can't release its stack until we swap context - # to another fiber: can't free/unmap nor push it to a shared stack pool; + # to another fiber. We can't free/unmap nor push it to a shared stack pool, # that would result in a segfault. - # - # Set by `Fiber::ExecutionContext::Scheduler#swapcontext` before switching - # context from a terminating fiber to another fiber running on the same - # thread. - def dead_fiber=(@dead_fiber : Fiber) : Fiber + @dead_fiber_stack = Pointer(Void)? + + # :nodoc: + def dying_fiber(fiber : Fiber) : Pointer(Void)? + stack = @dead_fiber + @dead_fiber_stack = fiber.@stack + stack end # :nodoc: - # - # Checked by `Fiber::ExecutionContext::Scheduler#swapcontext` after the - # context switch and by `Fiber#run` to release the stack of a dead fiber - # previously running on this thread. - # - # Automatically clears `@dead_fiber`: we only need the information once. - def dead_fiber? : Fiber? - if fiber = @dead_fiber - @dead_fiber = nil - fiber + def dead_fiber_stack? : Pointer(Void)? + if stack = @dead_fiber_stack + @dead_fiber_stack = nil + stack end end {% else %} diff --git a/src/fiber.cr b/src/fiber.cr index 39a9f2bf2b85..cba188a6028e 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -166,14 +166,6 @@ class Fiber def run GC.unlock_read - {% if flag?(:execution_context) && !flag?(:interpreted) %} - # if the fiber previously running on this thread has terminated, we can - # now safely release its stack - if fiber = Thread.current.dead_fiber? - fiber.execution_context.stack_pool.release(fiber.@stack) - end - {% end %} - @proc.call rescue ex if name = @name @@ -192,14 +184,18 @@ class Fiber @alive = false - {% unless flag?(:interpreted) || flag?(:execution_context) %} - # interpreted: the interpreter is managing the stacks - # - # execution context: do not prematurely release the stack before we switch - # to another fiber so we don't end up with a thread reusing a stack for a - # new fiber while the current fiber isn't fully terminated (oops); even - # without the pool, we can't unmap before we swap context. - Crystal::Scheduler.stack_pool.release(@stack) + # the interpreter is managing the stacks + {% unless flag?(:interpreted) %} + {% if flag?(:execution_context) %} + # do not prematurely release the stack before we switch to another fiber + if stack = Thread.current.dying_fiber(self) + # we can however release the stack of a previously dying fiber (we + # since swapped context) + execution_context.stack_pool.release(stack) + end + {% else %} + Crystal::Scheduler.stack_pool.release(@stack) + {% end %} {% end %} Fiber.suspend diff --git a/src/fiber/execution_context/scheduler.cr b/src/fiber/execution_context/scheduler.cr index 3a172a8da585..c561fcf7e2ea 100644 --- a/src/fiber/execution_context/scheduler.cr +++ b/src/fiber/execution_context/scheduler.cr @@ -45,42 +45,24 @@ module Fiber::ExecutionContext # Switches the thread from running the current fiber to run *fiber* instead. # - # Handles thread safety around fiber stacks: - # - # 1. locks the GC to not start a collection while we're switching context - # 2. memorizes a dying fiber then releases its stack after switching context - # - # These two operations mean that `Fiber#run` must also release the GC lock - # and check for a dead fiber! + # Handles thread safety around fiber stacks by locking the GC, so it won't + # start a GC collection while we're switching context. # # Unsafe. Must only be called by the current scheduler. The caller must # ensure that the fiber indeed belongs to the current execution context and # that the fiber can indeed be resumed. + # + # WARNING: after switching context you can't trust *self* anymore (it is the + # scheduler that resumed *fiber* which may not be the one that suspended + # *fiber*) or instance variables; local variables however are the ones from + # before swapping context. protected def swapcontext(fiber : Fiber) : Nil current_fiber = thread.current_fiber - {% unless flag?(:interpreted) %} - thread.dead_fiber = current_fiber if current_fiber.dead? - {% end %} - GC.lock_read thread.current_fiber = fiber Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) GC.unlock_read - - # we switched context so we can't trust *self* anymore (it is the - # scheduler that rescheduled *fiber* which may be another scheduler) as - # well as any other local or instance variables (e.g. we must resolve - # `Thread.current` again) - # - # that being said, we can still trust the *current_fiber* local variable - # (it's the only exception) - - {% unless flag?(:interpreted) %} - if fiber = Thread.current.dead_fiber? - fiber.execution_context.stack_pool.release(fiber.@stack) - end - {% end %} end # Returns the current status of the scheduler. For example `"running"`, diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 062785b61313..1df217fc4356 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -79,7 +79,11 @@ class Fiber private def pop? {% if flag?(:execution_context) %} - @lock.sync { @deque.pop? } unless @deque.empty? + if stack = Thread.current.dead_fiber_stack? + stack + else + @lock.sync { @deque.pop? } unless @deque.empty? + end {% else %} @deque.pop? {% end %} From ce23a0e62b9164687380299ce9fa0fed6e7fddf5 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 20 Jan 2025 18:37:33 +0100 Subject: [PATCH 09/11] Fix: compilation with Thread dead_fiber_stack --- src/crystal/system/thread.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 41f66a57b5dc..1c0c9838575d 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -83,11 +83,11 @@ class Thread # When a fiber terminates we can't release its stack until we swap context # to another fiber. We can't free/unmap nor push it to a shared stack pool, # that would result in a segfault. - @dead_fiber_stack = Pointer(Void)? + @dead_fiber_stack : Pointer(Void)? # :nodoc: def dying_fiber(fiber : Fiber) : Pointer(Void)? - stack = @dead_fiber + stack = @dead_fiber_stack @dead_fiber_stack = fiber.@stack stack end From 95718e6b8746362798121c2946bb393ad41a2c97 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 20 Jan 2025 19:15:21 +0100 Subject: [PATCH 10/11] Fixup: can't infer type of ivar declared in macro (stack pool lock) --- src/fiber/stack_pool.cr | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 1df217fc4356..65452fb1056e 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -5,6 +5,9 @@ class Fiber class StackPool STACK_SIZE = 8 * 1024 * 1024 + # FIXME: crystal can't infer ivars declared inside macros... + @lock = uninitialized Crystal::SpinLock + # If *protect* is true, guards all top pages (pages with the lowest address # values) in the allocated stacks; accessing them triggers an error # condition, allowing stack overflows on non-main fibers to be detected. From f6e11e25067b1f6ecae3154024849237dfa5b981 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 10:04:39 +0100 Subject: [PATCH 11/11] fixup! Fixup: can't infer type of ivar declared in macro (stack pool lock) --- src/fiber/stack_pool.cr | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 65452fb1056e..d3b91d8eb25a 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -5,8 +5,10 @@ class Fiber class StackPool STACK_SIZE = 8 * 1024 * 1024 - # FIXME: crystal can't infer ivars declared inside macros... - @lock = uninitialized Crystal::SpinLock + {% if flag?(:execution_context) %} + # must explicitly declare the variable because of the macro in #initialize + @lock = uninitialized Crystal::SpinLock + {% end %} # If *protect* is true, guards all top pages (pages with the lowest address # values) in the allocated stacks; accessing them triggers an error