From 4d8c1f72a6d24d02ce78d2940bcec74648a43d1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 12 May 2024 14:30:26 +0200 Subject: [PATCH] Negotiate frame_max with Upstream --- src/amqproxy/upstream.cr | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 03e7369..d767187 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -6,14 +6,15 @@ require "./channel_pool" module AMQProxy class Upstream - Log = ::Log.for(self) - FrameMax = 4096_u32 + Log = ::Log.for(self) @socket : IO @channels = Hash(UInt16, DownstreamChannel).new @channels_lock = Mutex.new @channel_max : UInt16 @lock = Mutex.new @remote_address : String + @channel_max : UInt16 + @frame_max : UInt32 def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials) tcp_socket = TCPSocket.new(@host, @port) @@ -32,7 +33,9 @@ module AMQProxy else tcp_socket end - @channel_max = start(credentials) + tune_ok = start(credentials) + @channel_max = tune_ok.channel_max + @frame_max = tune_ok.frame_max end def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel @@ -155,14 +158,10 @@ module AMQProxy private def expect_more_publish_frames?(frame) : Bool case frame - when AMQ::Protocol::Frame::Basic::Publish - return true - when AMQ::Protocol::Frame::Header - return true unless frame.body_size.zero? - when AMQ::Protocol::Frame::BytesBody - return true if frame.bytesize == FrameMax + when AMQ::Protocol::Frame::Basic::Publish then true + when AMQ::Protocol::Frame::Header then frame.body_size != 0 + else false end - false end def close(reason = "") @@ -180,7 +179,7 @@ module AMQProxy @socket.closed? end - private def start(credentials) : UInt16 + private def start(credentials) : AMQ::Protocol::Frame::Connection::TuneOk @socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice @socket.flush @@ -195,7 +194,8 @@ module AMQProxy case tune = AMQ::Protocol::Frame.from_io(@socket) when AMQ::Protocol::Frame::Connection::Tune channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max - tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, FrameMax, tune.heartbeat) + frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max) + tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat) @socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian @socket.flush when AMQ::Protocol::Frame::Connection::Close @@ -217,7 +217,7 @@ module AMQProxy else raise "Unexpected frame on connection to upstream: #{f}" end - channel_max + tune_ok rescue ex : AccessError raise ex rescue ex