Skip to content

Commit

Permalink
intercept Channel.close from server, always reply CloseOk
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed May 10, 2024
1 parent 9ecdca1 commit 2ee8bf8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ module AMQProxy

# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
end
@outgoing_frames.send frame
rescue Channel::ClosedError
# do nothing
Expand All @@ -123,7 +127,6 @@ module AMQProxy

def close_channel(id, code, reason)
write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
@channel_map[id] = nil
end

private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED")
Expand Down
7 changes: 6 additions & 1 deletion src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ module AMQProxy
when AMQ::Protocol::Frame::Connection::Blocked,
AMQ::Protocol::Frame::Connection::Unblocked
send_to_all_clients(frame)
when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds
when AMQ::Protocol::Frame::Channel::OpenOk # assume it always succeeds
when AMQ::Protocol::Frame::Channel::Close
send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) }
downstream_channel.write frame
end
when AMQ::Protocol::Frame::Channel::CloseOk # when client requested channel close
@channels_lock.synchronize { @channels.delete(frame.channel) }
else
Expand Down

0 comments on commit 2ee8bf8

Please sign in to comment.