Skip to content

Commit

Permalink
WIP: Thread based concurrency
Browse files Browse the repository at this point in the history
Concurrently runs a number of 'Racecar::Runner' instances in a fixed size thread pool.

Each thread starts a single `Racecar::Runner` with `Racecar::Consumer`
class instance. All threads run the same consumer class, have the same config
and consume partitions from the same topic(s).

`ThreadPoolRunnerProxy` can be combined with `ParallelRunner`, to run
forks and threads. `ParallelRunner` is not used or battle tested (at
Zendesk) and so this is still not a recommended thing to do.

Other inclusions:

- Signal handling has been moved up one level to the CLI
- Runner-like object interface standardized to `#run` `#stop` `#running?`
- Test can be run locally with Docker, `export LOCAL=1`
- Some test bugs have been fixed, connections now always close and
  orphaned processes raise an exception
  • Loading branch information
bestie committed Jan 25, 2023
1 parent fd33e62 commit 3ca2bf0
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 54 deletions.
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
racecar (2.8.2)
concurrent-ruby
king_konf (~> 1.0.0)
rdkafka (~> 0.12.0)

Expand Down
30 changes: 26 additions & 4 deletions lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "racecar/consumer_set"
require "racecar/runner"
require "racecar/parallel_runner"
require "racecar/threadpool_runner_proxy"
require "racecar/config"
require "racecar/version"
require "ensure_hash_compact"
Expand Down Expand Up @@ -52,12 +53,33 @@ def self.instrumenter
end

def self.run(processor)
runner = Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter)
runner(processor).run
end

if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger).run
def self.runner(processor)
if config.threads && config.threads > 1
runner = multithreaded_runner(processor)
else
runner.run
runner = standard_runner(processor)
end

if config.forks && config.forks > 1
runner = forking_runner(processor, runner)
end

runner
end

private_class_method def self.forking_runner(processor, base_runner)
ParallelRunner.new(runner: base_runner, config: config, logger: logger)
end

private_class_method def self.multithreaded_runner(processor)
standard_runner_factory = method(:standard_runner)
ThreadPoolRunnerProxy.new(processor, config: config, runner_factory: standard_runner_factory)
end

private_class_method def self.standard_runner(processor)
Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter)
end
end
30 changes: 25 additions & 5 deletions lib/racecar/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ def run
require "./config/racecar"
end

# Find the consumer class by name.
consumer_class = Kernel.const_get(consumer_name)

# Load config defined by the consumer class itself.
config.load_consumer_class(consumer_class)

Expand All @@ -58,13 +55,36 @@ def run
$stderr.puts "=> Ctrl-C to shutdown consumer"
end

processor = consumer_class.new
Racecar.run(processor)
install_signal_handlers
runner.run
nil
end

def stop
runner.stop
end

private

def install_signal_handlers
# SIGINT, SIGTERM, SIGQUIT stop the runner.
# The runner stops all its consumers, processes and/or threads.
Signal.trap("QUIT") { stop }
Signal.trap("INT") { stop }
Signal.trap("TERM") { stop }

# Print the consumer config to STDERR on USR1.
Signal.trap("USR1") { $stderr.puts config.inspect }
end

def runner
@runner ||= Racecar.runner(consumer_class.new)
end

def consumer_class
consumer_class = Kernel.const_get(consumer_name)
end

attr_reader :consumer_name

def config
Expand Down
20 changes: 17 additions & 3 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class Config < KingKonf::Config
desc "How long to try to deliver a produced message before finally giving up (in seconds)"
float :message_timeout, default: 5*60

desc "How long to wait for the thread pool to be ready or to shutdown (in seconds)"
float :thread_pool_timeout, default: 5

desc "Maximum amount of data the broker shall return for a Fetch request"
integer :max_bytes, default: 10485760

Expand Down Expand Up @@ -170,7 +173,17 @@ class Config < KingKonf::Config
# The error handler must be set directly on the object.
attr_reader :error_handler

attr_accessor :subscriptions, :logger, :parallel_workers
attr_accessor :subscriptions, :logger, :forks, :threads

# For deprecation
def parallel_workers
self.forks
end

# For deprecation
def parallel_workers=(forks)
self.forks = forks
end

def statistics_interval_ms
if Rdkafka::Config.statistics_callback
Expand Down Expand Up @@ -223,7 +236,8 @@ def load_consumer_class(consumer_class)
consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase,
].compact.join

self.parallel_workers = consumer_class.parallel_workers
self.forks = consumer_class.forks
self.threads = consumer_class.threads
self.subscriptions = consumer_class.subscriptions
self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time
self.fetch_messages = consumer_class.fetch_messages || self.fetch_messages
Expand Down Expand Up @@ -255,7 +269,7 @@ def rdkafka_producer
def rdkafka_security_config
{
"security.protocol" => security_protocol,
"enable.ssl.certificate.verification" => ssl_verify_hostname,
# "enable.ssl.certificate.verification" => ssl_verify_hostname,
"ssl.ca.location" => ssl_ca_location,
"ssl.crl.location" => ssl_crl_location,
"ssl.keystore.location" => ssl_keystore_location,
Expand Down
22 changes: 19 additions & 3 deletions lib/racecar/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,25 @@ class Consumer
Subscription = Struct.new(:topic, :start_from_beginning, :max_bytes_per_partition, :additional_config)

class << self
attr_accessor :max_wait_time
attr_accessor :group_id
attr_accessor :producer, :consumer, :parallel_workers, :fetch_messages
attr_accessor(
:max_wait_time,
:group_id,
:producer,
:consumer,
:fetch_messages,
:forks,
:threads,
)

# For deprecation
def parallel_workers
self.forks
end

# For deprecation
def parallel_workers=(forks)
self.forks = forks
end

def subscriptions
@subscriptions ||= []
Expand Down
9 changes: 9 additions & 0 deletions lib/racecar/parallel_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def initialize(runner:, config:, logger:)
@runner = runner
@config = config
@logger = logger
@workers = []
end

def worker_pids
Expand All @@ -20,6 +21,14 @@ def running?
@running
end

def stop
if workers.any?
terminate_workers
else
runner.stop
end
end

def run
logger.info "=> Running with #{config.parallel_workers} parallel workers"

Expand Down
19 changes: 8 additions & 11 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def initialize(processor, config:, logger:, instrumenter: NullInstrumenter)
@processor, @config, @logger = processor, config, logger
@instrumenter = instrumenter
@stop_requested = false
@running = false
Rdkafka::Config.logger = logger

if processor.respond_to?(:statistics_callback)
Expand Down Expand Up @@ -46,8 +47,8 @@ def setup_pauses
end

def run
install_signal_handlers
@stop_requested = false
@running = true

# Configure the consumer with a producer so it can produce messages and
# with a consumer so that it can support advanced use-cases.
Expand Down Expand Up @@ -92,6 +93,8 @@ def run
end
end
ensure
@running = false

producer.close
Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog")
end
Expand All @@ -100,6 +103,10 @@ def stop
@stop_requested = true
end

def running?
!!@running
end

private

attr_reader :pauses
Expand Down Expand Up @@ -167,16 +174,6 @@ def delivery_callback
end
end

def install_signal_handlers
# Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
trap("QUIT") { stop }
trap("INT") { stop }
trap("TERM") { stop }

# Print the consumer config to STDERR on USR1.
trap("USR1") { $stderr.puts config.inspect }
end

def process(message)
instrumentation_payload = {
consumer_class: processor.class.to_s,
Expand Down
115 changes: 115 additions & 0 deletions lib/racecar/threadpool_runner_proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
require "concurrent-ruby"

module Racecar
class ThreadPoolRunnerProxy
def initialize(processor, config:, runner_factory:)
@processor = processor
@config = config
@runner_factory = runner_factory

@parent_thread = Thread.current
@runners = []
end

attr_reader :runner_factory, :processor, :config
private :runner_factory, :processor, :config

attr_reader :parent_thread, :runners
private :parent_thread, :runners

def stop
runners.each(&:stop)
end

def running?
@runners.any?(&:running?)
end

def run
Thread.current.name = "main"

logger.debug("Spawning #{thread_count} " + "🧵" * thread_count)
logger.info("ThreadPoolRunnerProxy starting #{thread_count} worker threads")

thread_count.times do |id|
thread_pool.post do
work_in_thread(id)
end
end

wait_for_runners_to_start
logger.debug("Threaded runners running 🧵🏃")
wait_for_normal_stop
ensure
ensure_thread_pool_terminates
nil
end

private

attr_reader :runners

def work_in_thread(id)
Thread.current.name = "Racecar worker thread #{Process.pid}-#{id}"
Thread.current.abort_on_exception = true

runner = runner_factory.call(processor)
runners << runner
logger.debug("ThreadPoolRunnerProxy starting runner #{runner} with consumer #{processor} 🏃🏁")

runner.run

logger.debug("ThreadPoolRunnerProxy runner stopped #{runner} 🏃🛑")
rescue Exception => e
logger.error("Error in threadpool, raising in parent thread. #{e.full_message}")
stop
parent_thread.raise(e)
end

def wait_for_runners_to_start
started_waiting_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)

until runners.length == thread_count && runners.all?(&:running?)
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_waiting_at

summary = runners.map { |r| "#{r}: running=#{r.running?}" }.join(", ")
logger.debug("Waiting for runners to start #{summary} 🥱🏃🏃🏃🏁")

break if elapsed > thread_pool_timeout
sleep(thread_check_interval)
end
end

def wait_for_normal_stop
until @runners.none?(&:running?)
sleep(thread_check_interval)
end
logger.debug("ThreadPoolRunnerProxy all runners have stopped 🏃🏃🏃 🛑🛑🛑")
end

def ensure_thread_pool_terminates
logger.debug("ThreadPoolRunnerProxy waiting for thread pool to terminate 🎱💀💀💀")
thread_pool.wait_for_termination(thread_pool_timeout)
end

def thread_pool
@thread_pool ||= Concurrent::FixedThreadPool.new(thread_count)
end

def thread_count
@threads ||= (config.threads || 1)
end

def thread_pool_timeout
config.thread_pool_timeout
end

def thread_check_interval
1
end

def logger
config.logger
end
end
end
1 change: 1 addition & 0 deletions racecar.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|

spec.add_runtime_dependency "king_konf", "~> 1.0.0"
spec.add_runtime_dependency "rdkafka", "~> 0.12.0"
spec.add_runtime_dependency "concurrent-ruby"

spec.add_development_dependency "bundler", [">= 1.13", "< 3"]
spec.add_development_dependency "pry"
Expand Down
Loading

0 comments on commit 3ca2bf0

Please sign in to comment.