Skip to content

Commit

Permalink
send Connection.Close-ok before closing socket
Browse files Browse the repository at this point in the history
Do that by dropping the write_loop fiber
  • Loading branch information
carlhoerberg committed May 11, 2024
1 parent a451c5e commit 17b1981
Showing 1 changed file with 16 additions and 27 deletions.
43 changes: 16 additions & 27 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module AMQProxy
Log = ::Log.for(self)
getter credentials : Credentials
@channel_map = Hash(UInt16, UpstreamChannel?).new
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
@lock = Mutex.new
@frame_max : UInt32
@channel_max : UInt16
@heartbeat : UInt16
Expand All @@ -21,7 +21,6 @@ module AMQProxy
@frame_max = tune_ok.frame_max
@channel_max = tune_ok.channel_max
@heartbeat = tune_ok.heartbeat
spawn write_loop
end

# Keep a buffer of publish frames
Expand Down Expand Up @@ -125,36 +124,27 @@ module AMQProxy
else
Log.debug { "Disconnected" }
ensure
@outgoing_frames.close
socket.close rescue nil
close_all_upstream_channels
end

private def write_loop(socket = @socket)
while frame = @outgoing_frames.receive?
socket.write_bytes frame, IO::ByteFormat::NetworkEndian
socket.flush unless expect_more_publish_frames?(frame)
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
when AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
break
end
# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
@lock.synchronize do
@socket.write_bytes frame, IO::ByteFormat::NetworkEndian
@socket.flush unless expect_more_publish_frames?(frame)
end
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
when AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
@socket.close rescue nil
end
rescue ex : IO::Error
# Client closed connection, suppress error
ensure
@outgoing_frames.close
socket.close rescue nil
end

# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
@outgoing_frames.send frame
rescue Channel::ClosedError
# do nothing
@socket.close rescue nil
end

def close_connection(code, text, frame = nil)
Expand Down Expand Up @@ -196,9 +186,8 @@ module AMQProxy
# @socket.read_timeout = 1.seconds
end

# Close the outgoing frames channel which will let write_loop close the socket
def close_socket
@outgoing_frames.close
@socket.close rescue nil
end

private def set_socket_options(socket = @socket)
Expand Down

0 comments on commit 17b1981

Please sign in to comment.