Skip to content

Commit

Permalink
On channel error, correctly send close-ok from client to upstream (#152)
Browse files Browse the repository at this point in the history
Fixes #151
  • Loading branch information
carlhoerberg authored Mar 9, 2024
1 parent 86c86d1 commit 6366582
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
19 changes: 19 additions & 0 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,23 @@ describe AMQProxy::Server do
s.upstream_connections.should eq 1
(Time.utc.to_unix - started).should be < 30
end

it "works after server closes channel" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
qname = "test#{rand}"
3.times do
expect_raises(AMQP::Client::Channel::ClosedException) do
ch = conn.channel
ch.basic_consume(qname) { }
end
end
end
ensure
s.stop_accepting_clients
end
end
end
4 changes: 3 additions & 1 deletion src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ module AMQProxy
end
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
# noop
if upstream_channel = @channel_map.delete(frame.channel)
upstream_channel.write(frame)
end
when frame.channel.zero?
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
Expand Down
16 changes: 7 additions & 9 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ module AMQProxy
end

# Frames from upstream (to client)
private def read_loop(socket, remote_address : String) # ameba:disable Metrics/CyclomaticComplexity
private def read_loop(socket, remote_address : String)
Log.context.set(remote_address: remote_address)
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
Expand All @@ -87,14 +87,7 @@ module AMQProxy
end
return
when AMQ::Protocol::Frame::Connection::CloseOk then return
when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds
when AMQ::Protocol::Frame::Channel::Close # when upstream server requested a channel close
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
if downstream_channel = @channels.delete(frame.channel)
downstream_channel.write frame
end
end
when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds
when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close
else
if downstream_channel = @channels[frame.channel]?
Expand Down Expand Up @@ -145,6 +138,11 @@ module AMQProxy
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Connection
raise "Connection frames should not be sent through here: #{frame}"
when AMQ::Protocol::Frame::Channel::CloseOk # when upstream server requested a channel close and client confirmed
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
@channels.delete(frame.channel)
end
when AMQ::Protocol::Frame::Channel
raise "Channel frames should not be sent through here: #{frame}"
end
Expand Down

0 comments on commit 6366582

Please sign in to comment.