From 2ee8bf864ce270fe4ebab99f81d81ff182ff23ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sat, 11 May 2024 00:53:49 +0200 Subject: [PATCH] intercept Channel.close from server, always reply CloseOk --- src/amqproxy/client.cr | 5 ++++- src/amqproxy/upstream.cr | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 9ec43bb..2e337b1 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -107,6 +107,10 @@ module AMQProxy # Send frame to client, channel id should already be remapped by the caller def write(frame : AMQ::Protocol::Frame) + case frame + when AMQ::Protocol::Frame::Channel::Close + @channel_map[frame.channel] = nil + end @outgoing_frames.send frame rescue Channel::ClosedError # do nothing @@ -123,7 +127,6 @@ module AMQProxy def close_channel(id, code, reason) write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16) - @channel_map[id] = nil end private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED") diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 774a714..a669646 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -74,7 +74,12 @@ module AMQProxy when AMQ::Protocol::Frame::Connection::Blocked, AMQ::Protocol::Frame::Connection::Unblocked send_to_all_clients(frame) - when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds + when AMQ::Protocol::Frame::Channel::OpenOk # assume it always succeeds + when AMQ::Protocol::Frame::Channel::Close + send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) + if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) } + downstream_channel.write frame + end when AMQ::Protocol::Frame::Channel::CloseOk # when client requested channel close @channels_lock.synchronize { @channels.delete(frame.channel) } else