Skip to content

Commit

Permalink
refactor send frames to upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Jan 18, 2024
1 parent 46e14ca commit 17f0b36
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ module AMQProxy
1_u16.upto(@channel_max) do |i|
next if @channels.has_key? i
@channels[i] = nil
@socket.write_bytes AMQ::Protocol::Frame::Channel::Open.new(i), IO::ByteFormat::NetworkEndian
@socket.flush
send AMQ::Protocol::Frame::Channel::Open.new(i)
return i
end
raise ChannelMaxReached.new
Expand All @@ -65,9 +64,7 @@ module AMQProxy

def close_channel(channel : UInt16)
@channels_lock.synchronize do
@socket.write_bytes AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16),
IO::ByteFormat::NetworkEndian
@socket.flush
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
@channels.delete channel
@unsafe_channels.delete channel
end
Expand All @@ -77,10 +74,10 @@ module AMQProxy
private def read_loop(socket = @socket)
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then write frame
when AMQ::Protocol::Frame::Heartbeat then send frame
when AMQ::Protocol::Frame::Connection::Close
close_all_client_channels
write AMQ::Protocol::Frame::Connection::CloseOk.new
send AMQ::Protocol::Frame::Connection::CloseOk.new
return
when AMQ::Protocol::Frame::Connection::CloseOk then return
when AMQ::Protocol::Frame::Channel::OpenOk
Expand Down Expand Up @@ -121,7 +118,7 @@ module AMQProxy
end
end

# Send frames to upstream from client
# Forward frames from client to upstream
def write(frame : AMQ::Protocol::Frame) : Nil
case frame
when AMQ::Protocol::Frame::Basic::Publish, AMQ::Protocol::Frame::Basic::Qos
Expand All @@ -138,6 +135,10 @@ module AMQProxy
when AMQ::Protocol::Frame::Channel
raise "Channel frames should not be sent through here: #{frame}"
end
send frame
end

private def send(frame : AMQ::Protocol::Frame) : Nil
@lock.synchronize do
@socket.write_bytes frame, IO::ByteFormat::NetworkEndian
@socket.flush
Expand Down

0 comments on commit 17f0b36

Please sign in to comment.