Skip to content

Commit

Permalink
close upstream channel on channel errors
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg authored and spuun committed May 6, 2024
1 parent 6f57fdc commit d41944e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 7 deletions.
26 changes: 26 additions & 0 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
require "./spec_helper"

describe AMQProxy::Server do
it "dont reuse channels closed by upstream" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
ch = conn.channel
ch.basic_publish "foobar", "non-existing"
end
AMQP::Client.start("amqp://localhost:5673") do |conn|
ch = conn.channel
ch.basic_publish_confirm "foobar", "amq.fanout"
end
AMQP::Client.start("amqp://localhost:5673") do |conn|
ch = conn.channel
expect_raises(AMQP::Client::Channel::ClosedException) do
ch.basic_publish_confirm "foobar", "non-existing"
end
end
sleep 0.1
s.upstream_connections.should eq 1
ensure
s.stop_accepting_clients
end
end

it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
Expand Down
4 changes: 4 additions & 0 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ module AMQProxy
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
Log.info { "Adding upstream connection" }
@upstreams.unshift upstream
spawn do
upstream.read_loop
@upstreams.delete upstream
end
rescue ex : IO::Error
raise Upstream::Error.new ex.message, cause: ex
end
Expand Down
5 changes: 2 additions & 3 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ module AMQProxy
end
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
if upstream_channel = @channel_map.delete(frame.channel)
upstream_channel.write(frame)
end
# CloseOk already sent in Upstream#read_loop
@channel_map.delete(frame.channel)
when frame.channel.zero?
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
Expand Down
15 changes: 11 additions & 4 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module AMQProxy
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
@remote_address : String

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials)
tcp_socket = TCPSocket.new(@host, @port)
Expand All @@ -22,6 +23,7 @@ module AMQProxy
tcp_socket.tcp_keepalive_count = 3
tcp_socket.tcp_keepalive_interval = 10
tcp_socket.tcp_nodelay = true
@remote_address = tcp_socket.remote_address.to_s
@socket =
if tls_ctx = @tls_ctx
tls_socket = OpenSSL::SSL::Socket::Client.new(tcp_socket, tls_ctx, hostname: @host)
Expand All @@ -31,7 +33,6 @@ module AMQProxy
tcp_socket
end
@channel_max = start(credentials)
spawn read_loop(@socket, tcp_socket.remote_address.to_s)
end

def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel
Expand Down Expand Up @@ -59,7 +60,7 @@ module AMQProxy
if @unsafe_channels.delete channel
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
@channels.delete channel
else
elsif @channels.has_key? channel
@channels[channel] = nil # keep for reuse
end
end
Expand All @@ -74,8 +75,8 @@ module AMQProxy
end

# Frames from upstream (to client)
private def read_loop(socket, remote_address : String) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: remote_address)
def read_loop(socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: @remote_address)
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then send frame
Expand All @@ -92,6 +93,12 @@ module AMQProxy
send_to_all_clients(frame)
when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds
when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close
when AMQ::Protocol::Frame::Channel::Close
send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
@unsafe_channels.delete(frame.channel)
if downstream_channel = @channels.delete(frame.channel)
downstream_channel.write(frame)
end
else
if downstream_channel = @channels[frame.channel]?
downstream_channel.write(frame)
Expand Down

0 comments on commit d41944e

Please sign in to comment.