Skip to content

Commit

Permalink
fixup! Channel pool
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Feb 18, 2024
1 parent 4bb7f52 commit 5a5ca8b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
5 changes: 2 additions & 3 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require "./records"
module AMQProxy
class Client
getter credentials : Credentials
@lock = Mutex.new
@channel_map = Hash(UInt16, UpstreamChannel).new
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
@frame_max : UInt32
Expand Down Expand Up @@ -59,7 +58,7 @@ module AMQProxy
end
end
end
rescue ex : IO::Error
rescue ex : IO::Error | OpenSSL::SSL::Error
raise Error.new("Client disconnected", ex) unless socket.closed?
ensure
@outgoing_frames.close
Expand All @@ -73,7 +72,7 @@ module AMQProxy

break if frame.is_a? AMQ::Protocol::Frame::Connection::CloseOk
end
rescue ex : IO::Error
rescue ex : IO::Error | OpenSSL::SSL::Error
raise ex unless socket.closed?
ensure
@outgoing_frames.close
Expand Down
20 changes: 11 additions & 9 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ module AMQProxy
end
return
when AMQ::Protocol::Frame::Connection::CloseOk then return
when AMQ::Protocol::Frame::Channel::OpenOk
when AMQ::Protocol::Frame::Channel::Close # when upstream server requested a channel close
when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds
when AMQ::Protocol::Frame::Channel::Close # when upstream server requested a channel close
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
if downstream_channel = @channels.delete(frame.channel)
Expand All @@ -95,14 +95,17 @@ module AMQProxy
if downstream_channel = @channels[frame.channel]?
downstream_channel.write(frame)
else
raise "frame for unmapped channel from upstream: #{frame}"
Log.debug { "Frame for unmapped channel from upstream: #{frame}" }
send AMQ::Protocol::Frame::Channel::Close.new(frame.channel, 500_u16,
"DOWNSTREAM_DISCONNECTED", 0_u16, 0_u16)
end
end
end
rescue ex : IO::Error | OpenSSL::SSL::Error
Log.error(exception: ex) { "Error reading from upstream" } unless socket.closed?
ensure
socket.close rescue nil
close_all_client_channels
end

def closed?
Expand All @@ -121,14 +124,13 @@ module AMQProxy
# Forward frames from client to upstream
def write(frame : AMQ::Protocol::Frame) : Nil
case frame
when AMQ::Protocol::Frame::Basic::Publish, AMQ::Protocol::Frame::Basic::Qos
when AMQ::Protocol::Frame::Basic::Publish,
AMQ::Protocol::Frame::Basic::Qos
when AMQ::Protocol::Frame::Basic::Get
@unsafe_channels.add(frame.channel) unless frame.no_ack
when AMQ::Protocol::Frame::Basic
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Confirm
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Tx
when AMQ::Protocol::Frame::Basic,
AMQ::Protocol::Frame::Confirm,
AMQ::Protocol::Frame::Tx
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Connection
raise "Connection frames should not be sent through here: #{frame}"
Expand Down

0 comments on commit 5a5ca8b

Please sign in to comment.