diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index 9daff74..42806d0 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -1,6 +1,32 @@ require "./spec_helper" describe AMQProxy::Server do + it "dont reuse channels closed by upstream" do + s = AMQProxy::Server.new("127.0.0.1", 5672, false) + begin + spawn { s.listen("127.0.0.1", 5673) } + Fiber.yield + AMQP::Client.start("amqp://localhost:5673") do |conn| + ch = conn.channel + ch.basic_publish "foobar", "non-existing" + end + AMQP::Client.start("amqp://localhost:5673") do |conn| + ch = conn.channel + ch.basic_publish_confirm "foobar", "amq.fanout" + end + AMQP::Client.start("amqp://localhost:5673") do |conn| + ch = conn.channel + expect_raises(AMQP::Client::Channel::ClosedException) do + ch.basic_publish_confirm "foobar", "non-existing" + end + end + sleep 0.1 + s.upstream_connections.should eq 1 + ensure + s.stop_accepting_clients + end + end + it "keeps connections open" do s = AMQProxy::Server.new("127.0.0.1", 5672, false) begin diff --git a/src/amqproxy/channel_pool.cr b/src/amqproxy/channel_pool.cr index 65336b6..18e2f77 100644 --- a/src/amqproxy/channel_pool.cr +++ b/src/amqproxy/channel_pool.cr @@ -39,6 +39,10 @@ module AMQProxy upstream = Upstream.new(@host, @port, @tls_ctx, @credentials) Log.info { "Adding upstream connection" } @upstreams.unshift upstream + spawn do + upstream.read_loop + @upstreams.delete upstream + end rescue ex : IO::Error raise Upstream::Error.new ex.message, cause: ex end diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 208244f..a08ee00 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -49,9 +49,8 @@ module AMQProxy end write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) when AMQ::Protocol::Frame::Channel::CloseOk - if upstream_channel = @channel_map.delete(frame.channel) - upstream_channel.write(frame) - end + # CloseOk already sent in Upstream#read_loop + @channel_map.delete(frame.channel) when frame.channel.zero? Log.error { "Unexpected connection frame: #{frame}" } close_connection(540_u16, "NOT_IMPLEMENTED", frame) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 0494092..2a26f1a 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -13,6 +13,7 @@ module AMQProxy @channels_lock = Mutex.new @channel_max : UInt16 @lock = Mutex.new + @remote_address : String def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials) tcp_socket = TCPSocket.new(@host, @port) @@ -22,6 +23,7 @@ module AMQProxy tcp_socket.tcp_keepalive_count = 3 tcp_socket.tcp_keepalive_interval = 10 tcp_socket.tcp_nodelay = true + @remote_address = tcp_socket.remote_address.to_s @socket = if tls_ctx = @tls_ctx tls_socket = OpenSSL::SSL::Socket::Client.new(tcp_socket, tls_ctx, hostname: @host) @@ -31,7 +33,6 @@ module AMQProxy tcp_socket end @channel_max = start(credentials) - spawn read_loop(@socket, tcp_socket.remote_address.to_s) end def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel @@ -59,7 +60,7 @@ module AMQProxy if @unsafe_channels.delete channel send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16) @channels.delete channel - else + elsif @channels.has_key? channel @channels[channel] = nil # keep for reuse end end @@ -74,8 +75,8 @@ module AMQProxy end # Frames from upstream (to client) - private def read_loop(socket, remote_address : String) # ameba:disable Metrics/CyclomaticComplexity - Log.context.set(remote_address: remote_address) + def read_loop(socket = @socket) # ameba:disable Metrics/CyclomaticComplexity + Log.context.set(remote_address: @remote_address) loop do case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) when AMQ::Protocol::Frame::Heartbeat then send frame @@ -92,6 +93,12 @@ module AMQProxy send_to_all_clients(frame) when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close + when AMQ::Protocol::Frame::Channel::Close + send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) + @unsafe_channels.delete(frame.channel) + if downstream_channel = @channels.delete(frame.channel) + downstream_channel.write(frame) + end else if downstream_channel = @channels[frame.channel]? downstream_channel.write(frame)