Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC 2: Skeleton for ExecutionContext #15350

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
41 changes: 31 additions & 10 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
require "fiber"
require "channel"
require "crystal/scheduler"
require "crystal/tracing"

{% if flag?(:execution_context) %}
require "fiber/execution_context"
{% else %}
require "crystal/scheduler"
{% end %}

# Blocks the current fiber for the specified number of seconds.
#
# While this fiber is waiting this time, other ready-to-execute
Expand All @@ -12,25 +17,36 @@ def sleep(seconds : Number) : Nil
if seconds < 0
raise ArgumentError.new "Sleep seconds must be positive"
end

Crystal::Scheduler.sleep(seconds.seconds)
sleep(seconds.seconds)
end

# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span) : Nil
Crystal::Scheduler.sleep(time)
Crystal.trace :sched, "sleep", for: time

{% if flag?(:execution_context) %}
Fiber.current.resume_event.add(time)
Fiber::ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.sleep(time)
{% end %}
end

# Blocks the current fiber forever.
#
# Meanwhile, other ready-to-execute fibers might start their execution.
def sleep : Nil
Crystal::Scheduler.reschedule
{% if flag?(:execution_context) %}
Fiber::ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.reschedule
{% end %}
end

{% begin %}
# Spawns a new fiber.
#
# NOTE: The newly created fiber doesn't run as soon as spawned.
Expand Down Expand Up @@ -64,12 +80,17 @@ end
# wg.wait
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% if flag?(:execution_context) %}
Fiber::ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block)
{% else %}
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% end %}
end
{% end %}

# Spawns a fiber by first creating a `Proc`, passing the *call*'s
# expressions to it, and letting the `Proc` finally invoke the *call*.
Expand Down
21 changes: 18 additions & 3 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop

@[AlwaysInline]
def self.current : self
Crystal::Scheduler.event_loop
{% if flag?(:execution_context) %}
Fiber::ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop
{% end %}
end

@[AlwaysInline]
def self.current? : self?
Crystal::Scheduler.event_loop?
def self.current? : self | Nil
{% if flag?(:execution_context) %}
Fiber::ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop?
{% end %}
end

# Runs the loop.
Expand All @@ -46,6 +54,13 @@ abstract class Crystal::EventLoop
# events.
abstract def run(blocking : Bool) : Bool

{% if flag?(:execution_context) %}
# Same as `#run` but collects runnable fibers into *queue* instead of
# enqueueing in parallel, so the caller is responsible and in control for
# when and how the fibers will be enqueued.
abstract def run(queue : Fiber::Queue*, blocking : Bool) : Nil
{% end %}

# Tells a blocking run loop to no longer wait for events to activate. It may
# for example enqueue a NOOP event with an immediate (or past) timeout. Having
# activated an event, the loop shall return, allowing the blocked thread to
Expand Down
8 changes: 8 additions & 0 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
iocp
end

# thread unsafe
def run(blocking : Bool) : Bool
enqueued = false

Expand All @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
enqueued
end

{% if flag?(:execution_context) %}
# thread unsafe
def run(queue : Fiber::Queue*, blocking : Bool) : Nil
run_impl(blocking) { |fiber| queue.value.push(fiber) }
end
{% end %}

# Runs the event loop and enqueues the fiber for the next upcoming event or
# completion.
private def run_impl(blocking : Bool, &) : Nil
Expand Down
43 changes: 36 additions & 7 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
event_base.loop(flags)
end

{% if flag?(:execution_context) %}
def run(queue : Fiber::Queue*, blocking : Bool) : Nil
Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking
@runnables = queue
run(blocking)
ensure
@runnables = nil
end

def callback_enqueue(fiber : Fiber) : Nil
if queue = @runnables
queue.value.push(fiber)
else
raise "BUG: libevent callback executed outside of #run(queue*, blocking) call"
end
end
{% end %}

def interrupt : Nil
event_base.loop_exit
end

# Create a new resume event for a fiber.
# Create a new resume event for a fiber (sleep).
def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
data.as(Fiber).enqueue
f = data.as(Fiber)
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end

# Creates a timeout_event.
# Creates a timeout event (timeout action of select expression).
def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
f = data.as(Fiber)
if (select_action = f.timeout_select_action)
if select_action = f.timeout_select_action
f.timeout_select_action = nil
select_action.time_expired(f)
else
f.enqueue
if select_action.time_expired?
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end
end
end
Expand Down
27 changes: 23 additions & 4 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,25 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
{% end %}

# NOTE: thread unsafe
# thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end
true
end

{% if flag?(:execution_context) %}
# thread unsafe
def run(queue : Fiber::Queue*, blocking : Bool) : Nil
system_run(blocking) { |fiber| queue.value.push(fiber) }
end
{% end %}

# fiber interface, see Crystal::EventLoop

def create_resume_event(fiber : Fiber) : FiberEvent
Expand Down Expand Up @@ -303,13 +314,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
Polling.arena.free(index) do |pd|
[email protected]_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

[email protected]_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

Expand Down
3 changes: 2 additions & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{% skip_file if flag?(:execution_context) %}

require "crystal/event_loop"
require "crystal/system/print_error"
require "fiber"
Expand Down Expand Up @@ -66,7 +68,6 @@ class Crystal::Scheduler
end

def self.sleep(time : Time::Span) : Nil
Crystal.trace :sched, "sleep", for: time
Thread.current.scheduler.sleep(time)
end

Expand Down
49 changes: 41 additions & 8 deletions src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ class Thread

getter name : String?

{% if flag?(:execution_context) %}
# :nodoc:
getter! execution_context : Fiber::ExecutionContext

# :nodoc:
property! scheduler : Fiber::ExecutionContext::Scheduler

# :nodoc:
def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext
main_fiber.execution_context = execution_context
end

# When a fiber terminates we can't release its stack until we swap context
# to another fiber. We can't free/unmap nor push it to a shared stack pool,
# that would result in a segfault.
@dead_fiber_stack : Pointer(Void)?

# :nodoc:
def dying_fiber(fiber : Fiber) : Pointer(Void)?
stack = @dead_fiber_stack
@dead_fiber_stack = fiber.@stack
stack
end

# :nodoc:
def dead_fiber_stack? : Pointer(Void)?
if stack = @dead_fiber_stack
@dead_fiber_stack = nil
stack
end
end
{% else %}
# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end
{% end %}

def self.unsafe_each(&)
# nothing to iterate when @@threads is nil + don't lazily allocate in a
# method called from a GC collection callback!
Expand Down Expand Up @@ -154,14 +195,6 @@ class Thread
thread.name = name
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end

protected def start
Thread.threads.push(self)
Thread.current = self
Expand Down
10 changes: 10 additions & 0 deletions src/crystal/tracing.cr
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ module Crystal
write value.name || '?'
end

{% if flag?(:execution_context) %}
def write(value : Fiber::ExecutionContext) : Nil
write value.name
end

def write(value : Fiber::ExecutionContext::Scheduler) : Nil
write value.name
end
{% end %}

def write(value : Pointer) : Nil
write "0x"
System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) }
Expand Down
Loading