Skip to content

Commit

Permalink
Add --term-client-close-timeout to give clients a chance finish their…
Browse files Browse the repository at this point in the history
… work
  • Loading branch information
spuun committed Sep 25, 2024
1 parent 2ac81bb commit c843192
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class AMQProxy::CLI
@log_level : Log::Severity = Log::Severity::Info
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@term_timeout = -1
@term_client_close_timeout = 0
@upstream = ENV["AMQP_URL"]?

def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity
Expand All @@ -19,11 +20,12 @@ class AMQProxy::CLI
when "main", ""
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "term_timeout" then @term_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
when "upstream" then @upstream = value
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "term_timeout" then @term_timeout = value.to_i
when "term_client_close_timeout" then @term_client_close_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
end
when "listen"
Expand Down Expand Up @@ -52,9 +54,12 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("--term-timeout=SECONDS", "At TERM the server will wait this many seconds for clients to gracefully close their sockets (default: infinite)") do |v|
parser.on("--term-timeout=SECONDS", "At TERM the server SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v|
@term_timeout = v.to_i
end
parser.on("--term-client-close-timeout=SECONDS", "At TERM the server SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v|
@term_client_close_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug }
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
Expand Down Expand Up @@ -105,18 +110,47 @@ class AMQProxy::CLI
until server.client_connections.zero?
sleep 0.2
end
Log.info { "No clients left. Exiting." }
end

def shutdown(server)
server.disconnect_clients
if @term_timeout >= 0
spawn do
sleep @term_timeout
abort "Exiting with #{server.client_connections} client connections still open"
if server.client_connections > 0
if @term_client_close_timeout > 0
wait_for_clients_to_close(server, @term_client_close_timeout.seconds)
end
server.disconnect_clients
end

if server.client_connections > 0
if @term_timeout >= 0
spawn do
sleep @term_timeout
abort "Exiting with #{server.client_connections} client connections still open"
end
end
end
end

def wait_for_clients_to_close(server, close_timeout)
Log.info { "Waiting for clients to close their connections." }
ch = Channel(Bool).new
spawn do
loop do
ch.send true if server.client_connections.zero?
sleep 0.1.seconds
end
rescue Channel::ClosedError
end

select
when ch.receive?
Log.info { "All clients has closed their connections." }
when timeout close_timeout
ch.close
Log.info { "Timeout waiting for clients to close their connections." }
end
end

struct JournalLogFormat < Log::StaticFormatter
def run
source
Expand Down

0 comments on commit c843192

Please sign in to comment.