Skip to content

Commit

Permalink
Negotiate frame_max with Upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed May 12, 2024
1 parent 4bd3675 commit 4d8c1f7
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ require "./channel_pool"

module AMQProxy
class Upstream
Log = ::Log.for(self)
FrameMax = 4096_u32
Log = ::Log.for(self)
@socket : IO
@channels = Hash(UInt16, DownstreamChannel).new
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
@remote_address : String
@channel_max : UInt16
@frame_max : UInt32

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials)
tcp_socket = TCPSocket.new(@host, @port)
Expand All @@ -32,7 +33,9 @@ module AMQProxy
else
tcp_socket
end
@channel_max = start(credentials)
tune_ok = start(credentials)
@channel_max = tune_ok.channel_max
@frame_max = tune_ok.frame_max
end

def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel
Expand Down Expand Up @@ -155,14 +158,10 @@ module AMQProxy

private def expect_more_publish_frames?(frame) : Bool
case frame
when AMQ::Protocol::Frame::Basic::Publish
return true
when AMQ::Protocol::Frame::Header
return true unless frame.body_size.zero?
when AMQ::Protocol::Frame::BytesBody
return true if frame.bytesize == FrameMax
when AMQ::Protocol::Frame::Basic::Publish then true
when AMQ::Protocol::Frame::Header then frame.body_size != 0
else false
end
false
end

def close(reason = "")
Expand All @@ -180,7 +179,7 @@ module AMQProxy
@socket.closed?
end

private def start(credentials) : UInt16
private def start(credentials) : AMQ::Protocol::Frame::Connection::TuneOk
@socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice
@socket.flush

Expand All @@ -195,7 +194,8 @@ module AMQProxy
case tune = AMQ::Protocol::Frame.from_io(@socket)
when AMQ::Protocol::Frame::Connection::Tune
channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, FrameMax, tune.heartbeat)
frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max)
tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat)
@socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian
@socket.flush
when AMQ::Protocol::Frame::Connection::Close
Expand All @@ -217,7 +217,7 @@ module AMQProxy
else
raise "Unexpected frame on connection to upstream: #{f}"
end
channel_max
tune_ok
rescue ex : AccessError
raise ex
rescue ex
Expand Down

0 comments on commit 4d8c1f7

Please sign in to comment.