Skip to content

Commit

Permalink
fixup! Channel pool
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Feb 19, 2024
1 parent 67dff54 commit 0210965
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 58 deletions.
3 changes: 2 additions & 1 deletion src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ class AMQProxy::CLI
if first_shutdown
first_shutdown = false
server.stop_accepting_clients
else
server.disconnect_clients
else
server.close_sockets
end
end
Signal::INT.trap &shutdown
Expand Down
8 changes: 5 additions & 3 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ module AMQProxy
end

private def add_upstream
Log.info { "Adding upstream connection" }
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
Log.info { "Adding upstream connection" }
@upstreams.unshift upstream
rescue ex : IO::Error
raise Upstream::Error.new ex.message, cause: ex
end

def connections
Expand All @@ -62,9 +64,9 @@ module AMQProxy
loop do
sleep @idle_connection_timeout.seconds
@lock.synchronize do
(@upstreams.size - 1).times do
(@upstreams.size - 1).times do # leave at least one connection
u = @upstreams.pop
if u.channels.zero?
if u.active_channels.zero?
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Expand Down
47 changes: 39 additions & 8 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module AMQProxy
@heartbeat : UInt16

def initialize(@socket : TCPSocket)
set_socket_options(@socket)
tune_ok, @credentials = negotiate(@socket)
@frame_max = tune_ok.frame_max
@channel_max = tune_ok.channel_max
Expand All @@ -24,14 +25,15 @@ module AMQProxy

# frames from enduser
def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: socket.remote_address.to_s)
Log.debug { "Connected" }
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat
write frame
when AMQ::Protocol::Frame::Heartbeat then write frame
when AMQ::Protocol::Frame::Connection::CloseOk then return
when AMQ::Protocol::Frame::Connection::Close
close_all_upstream_channels
write AMQ::Protocol::Frame::Connection::CloseOk.new
when AMQ::Protocol::Frame::Connection::CloseOk
return
when AMQ::Protocol::Frame::Channel::Open
raise "Channel already opened" if @channel_map.has_key? frame.channel
Expand All @@ -46,7 +48,8 @@ module AMQProxy
when AMQ::Protocol::Frame::Channel::CloseOk
# noop
when frame.channel.zero?
raise "unexpected connection frame: #{frame}"
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
else
src_channel = frame.channel
begin
Expand All @@ -55,12 +58,22 @@ module AMQProxy
rescue ex : Upstream::WriteError
close_channel(src_channel)
rescue KeyError
raise "channel not open"
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
end
end
rescue ex : IO::EOFError
Log.debug { "Disconnected" }
rescue ex : IO::Error
raise Error.new("Client disconnected", ex) unless socket.closed?
Log.error(exception: ex) { "IO error" } unless socket.closed?
rescue ex : Upstream::AccessError
Log.error { "Access refused, reason: #{ex.message}" }
close_connection(403_u16, ex.message || "ACCESS_REFUSED")
rescue ex : Upstream::Error
Log.error(exception: ex) { "Upstream error" }
close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}")
else
Log.debug { "Disconnected" }
ensure
@outgoing_frames.close
close_all_upstream_channels
Expand All @@ -86,6 +99,15 @@ module AMQProxy
@outgoing_frames.send frame
end

def close_connection(code, text, frame = nil)
case frame
when AMQ::Protocol::Frame::Method
write AMQ::Protocol::Frame::Connection::Close.new(code, text, frame.class_id, frame.method_id)
else
write AMQ::Protocol::Frame::Connection::Close.new(code, text, 0_u16, 0_u16)
end
end

def close_channel(id)
write AMQ::Protocol::Frame::Channel::Close.new(id, 500_u16, "UPSTREAM_DISCONNECTED", 0_u16, 0_u16)
end
Expand Down Expand Up @@ -120,7 +142,16 @@ module AMQProxy
@outgoing_frames.close
end

def negotiate(socket = @socket)
private def set_socket_options(socket = @socket)
socket.sync = false
socket.keepalive = true
socket.tcp_nodelay = true
socket.tcp_keepalive_idle = 60
socket.tcp_keepalive_count = 3
socket.tcp_keepalive_interval = 10
end

private def negotiate(socket = @socket)
proto = uninitialized UInt8[8]
socket.read_fully(proto.to_slice)

Expand Down Expand Up @@ -178,7 +209,7 @@ module AMQProxy
capabilities: {
consumer_priorities: true,
exchange_exchange_bindings: true,
"connection.blocked": true,
"connection.blocked": false,
authentication_failure_close: true,
per_consumer_qos: true,
"basic.nack": true,
Expand Down
41 changes: 12 additions & 29 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,59 +28,42 @@ module AMQProxy
end

def listen(address, port)
@socket = socket = TCPServer.new(address, port)
Log.info { "Proxy listening on #{socket.local_address}" }
while client = socket.accept?
addr = client.remote_address
spawn handle_connection(client, addr), name: "handle connection #{addr}"
@server = server = TCPServer.new(address, port)
Log.info { "Proxy listening on #{server.local_address}" }
while socket = server.accept?
addr = socket.remote_address
spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}"
end
Log.info { "Proxy stopping accepting connections" }
end

def stop_accepting_clients
@socket.try &.close
@server.try &.close
end

def disconnect_clients
Log.info { "Disconnecting clients" }
@clients_lock.synchronize do
@clients.each &.close # send Connection#Close frames
end
sleep 1 # wait for clients to disconnect voluntarily
end

def close_sockets
Log.info { "Closing client sockets" }
@clients_lock.synchronize do
@clients.each &.close_socket # close sockets forcefully
end
end

private def handle_connection(socket, remote_address)
socket.sync = false
socket.keepalive = true
socket.tcp_nodelay = true
socket.tcp_keepalive_idle = 60
socket.tcp_keepalive_count = 3
socket.tcp_keepalive_interval = 10
Log.debug { "Client connected: #{remote_address}" }
c = Client.new(socket)
active_client(c) do
channel_pool = @channel_pools[c.credentials]
c.read_loop(channel_pool)
rescue ex : Upstream::AccessError
Log.error { "Access refused, reason: #{ex.message}" }
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED - #{ex.message}", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
socket.flush
rescue ex : Upstream::Error
Log.error { "Upstream error: #{ex.inspect} (cause: #{ex.cause.inspect})" }
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
socket.flush
end
rescue ex : Client::Error
Log.debug { "Client disconnected: #{remote_address}: #{ex.inspect}" }
ensure
Log.debug { "Client disconnected: #{remote_address}" }
socket.close rescue nil
rescue ex # only raise from constructor, when negotating
Log.debug { "Client connection failure (#{remote_address}) #{ex.inspect}" }
socket.close
end

private def active_client(client, &)
Expand Down
53 changes: 36 additions & 17 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module AMQProxy
tcp_socket
end
@channel_max = start(credentials)
spawn read_loop
spawn read_loop(@socket, tcp_socket.remote_address.to_s)
end

def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel
Expand Down Expand Up @@ -69,8 +69,13 @@ module AMQProxy
@channels.size
end

def active_channels
@channels.count { |_, v| !v.nil? }
end

# Frames from upstream (to client)
private def read_loop(socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
private def read_loop(socket, remote_address : String) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: remote_address)
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then send frame
Expand Down Expand Up @@ -113,11 +118,15 @@ module AMQProxy
end

private def close_all_client_channels
Log.debug { "Closing all client channels for closed upstream" }
@channels_lock.synchronize do
cnt = 0
@channels.each_value do |downstream_channel|
downstream_channel.try &.close
if dch = downstream_channel
dch.close
cnt += 1
end
end
Log.debug { "Upstream connection closed, closing #{cnt} client channels" } unless cnt.zero?
@channels.clear
end
end
Expand Down Expand Up @@ -175,29 +184,33 @@ module AMQProxy

response = "\u0000#{credentials.user}\u0000#{credentials.password}"
start_ok = AMQ::Protocol::Frame::Connection::StartOk.new(response: response, client_properties: ClientProperties, mechanism: "PLAIN", locale: "en_US")
start_ok.to_io @socket, IO::ByteFormat::NetworkEndian
@socket.write_bytes start_ok, IO::ByteFormat::NetworkEndian
@socket.flush

tune = AMQ::Protocol::Frame.from_io(@socket).as(AMQ::Protocol::Frame::Connection::Tune)
channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, tune.frame_max, tune.heartbeat)
tune_ok.to_io @socket, IO::ByteFormat::NetworkEndian
@socket.flush
case tune = AMQ::Protocol::Frame.from_io(@socket)
when AMQ::Protocol::Frame::Connection::Tune
channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, tune.frame_max, tune.heartbeat)
@socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian
@socket.flush
when AMQ::Protocol::Frame::Connection::Close
send_close_ok
raise AccessError.new tune.reply_text
else
raise "Unexpected frame on connection to upstream: #{tune}"
end

open = AMQ::Protocol::Frame::Connection::Open.new(vhost: credentials.vhost)
open.to_io @socket, IO::ByteFormat::NetworkEndian
@socket.write_bytes open, IO::ByteFormat::NetworkEndian
@socket.flush

case f = AMQ::Protocol::Frame.from_io(@socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Connection::OpenOk
when AMQ::Protocol::Frame::Connection::Close
close_ok = AMQ::Protocol::Frame::Connection::CloseOk.new
close_ok.to_io @socket, IO::ByteFormat::NetworkEndian
@socket.flush
@socket.close
send_close_ok
raise AccessError.new f.reply_text
else
raise "Unexpeceted frame on connection to upstream: #{f}"
raise "Unexpected frame on connection to upstream: #{f}"
end
channel_max
rescue ex : AccessError
Expand All @@ -207,14 +220,20 @@ module AMQProxy
raise Error.new ex.message, cause: ex
end

private def send_close_ok
@socket.write_bytes AMQ::Protocol::Frame::Connection::CloseOk.new, IO::ByteFormat::NetworkEndian
@socket.flush
@socket.close
end

ClientProperties = AMQ::Protocol::Table.new({
connection_name: "AMQProxy #{VERSION}",
product: "AMQProxy",
version: VERSION,
capabilities: {
consumer_priorities: true,
exchange_exchange_bindings: true,
"connection.blocked": true,
"connection.blocked": false,
authentication_failure_close: true,
per_consumer_qos: true,
"basic.nack": true,
Expand Down

0 comments on commit 0210965

Please sign in to comment.