Skip to content

Commit

Permalink
fixup! Channel pool
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Feb 17, 2024
1 parent b79013d commit c044a6f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
14 changes: 8 additions & 6 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module AMQProxy
@upstreams = Deque(Upstream).new

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32)
spawn shrink_pool_loop, name: "shrink pool loop"
end

def get(downstream_channel : DownstreamChannel) : UpstreamChannel
Expand All @@ -35,6 +36,7 @@ module AMQProxy
end

private def add_upstream
Log.info { "Adding upstream connection" }
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
@upstreams.unshift upstream
end
Expand All @@ -44,6 +46,7 @@ module AMQProxy
end

def close
Log.info { "Closing all upstream connections" }
@lock.synchronize do
while u = @upstreams.shift?
begin
Expand All @@ -57,12 +60,11 @@ module AMQProxy

private def shrink_pool_loop
loop do
sleep 5.seconds
sleep @idle_connection_timeout.seconds
@lock.synchronize do
max_connection_age = Time.monotonic - @idle_connection_timeout.seconds
@upstreams.size.times do
u = @upstreams.shift
if u.last_used < max_connection_age
(@upstreams.size - 1).times do
u = @upstreams.pop
if u.channels.zero?
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Expand All @@ -71,7 +73,7 @@ module AMQProxy
elsif u.closed?
Log.error { "Removing closed upstream connection from pool" }
else
@upstreams.push u
@upstreams.unshift u
end
end
end
Expand Down
6 changes: 4 additions & 2 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require "./upstream"
module AMQProxy
class Server
Log = ::Log.for(self)
@clients_lock = Mutex.new
@clients = Array(Client).new

def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5)
@clients_lock = Mutex.new
@clients = Array(Client).new
tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls
@channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials|
hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout)
Expand Down Expand Up @@ -42,10 +42,12 @@ module AMQProxy
end

def disconnect_clients
Log.info { "Disconnecting clients" }
@clients_lock.synchronize do
@clients.each &.close # send Connection#Close frames
end
sleep 1 # wait for clients to disconnect voluntarily
Log.info { "Closing client sockets" }
@clients_lock.synchronize do
@clients.each &.close_socket # close sockets forcefully
end
Expand Down
7 changes: 5 additions & 2 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require "./channel_pool"
module AMQProxy
class Upstream
Log = ::Log.for(self)
property last_used = Time.monotonic
@socket : IO
@unsafe_channels = Set(UInt16).new
@channels = Hash(UInt16, DownstreamChannel?).new
Expand Down Expand Up @@ -66,8 +65,12 @@ module AMQProxy
end
end

def channels
@channels.size
end

# Frames from upstream (to client)
def read_loop(socket = @socket)
private def read_loop(socket = @socket)
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then send frame
Expand Down

0 comments on commit c044a6f

Please sign in to comment.