Skip to content

Commit

Permalink
Channel pool
Browse files Browse the repository at this point in the history
Instead of giving each client connection an upstream connection, each
client channel is given an upstream channel upon request, which means that multiple
client connections can share a single upstream connection, massivly
reducing resources required of the upstream server.
  • Loading branch information
carlhoerberg committed Feb 20, 2024
1 parent 40cc76d commit 54fdfd3
Show file tree
Hide file tree
Showing 15 changed files with 559 additions and 420 deletions.
2 changes: 2 additions & 0 deletions .ameba.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Excluded:
- test/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
*.tar.gz
*.swp
/builds/
shard.override.yml
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [v2.0.0] - 2024-02-19

- Rewrite of the proxy where Channels are pooled rather than connections. When a client opens a channel it will get a channel on a shared upstream connection, the proxy will remap the channel numbers between the two. Many client connections can therefor share a single upstream connection. Upside is that way fewer connections are needed to the upstream server, downside is that if there's a misbehaving client, for which the server closes the connection, all channels for other clients on that shared connection will also be closed.

## [v1.0.0] - 2024-02-19

- Nothing changed from v0.8.14
Expand Down
10 changes: 3 additions & 7 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ version: 2.0
shards:
ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.5.0
version: 1.6.1

amq-protocol:
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.4
version: 1.1.14

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.0.11

logger:
git: https://github.com/84codes/logger.cr.git
version: 1.0.2
version: 1.2.1

4 changes: 1 addition & 3 deletions shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: amqproxy
version: 1.0.0
version: 2.0.0

authors:
- CloudAMQP <[email protected]>
Expand All @@ -11,8 +11,6 @@ targets:
dependencies:
amq-protocol:
github: cloudamqp/amq-protocol.cr
logger:
github: 84codes/logger.cr

development_dependencies:
amqp-client:
Expand Down
47 changes: 35 additions & 12 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ require "./spec_helper"

describe AMQProxy::Server do
it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
10.times do
AMQP::Client.start("amqp://localhost:5673") do |conn|
conn.channel
ch = conn.channel
ch.basic_publish "foobar", "amq.fanout", ""
s.client_connections.should eq 1
s.upstream_connections.should eq 1
end
sleep 0.1
end
s.client_connections.should eq 0
s.upstream_connections.should eq 1
Expand All @@ -22,7 +22,7 @@ describe AMQProxy::Server do
end

it "publish and consume works" do
server = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { server.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -38,18 +38,19 @@ describe AMQProxy::Server do
queue = channel.queue(queue_name)
queue.publish_confirm(message_payload)
end
sleep 0.1
end
sleep 0.1

AMQP::Client.start("amqp://localhost:5673") do |conn|
channel = conn.channel
channel.basic_consume(queue_name, block: true, tag: "AMQProxy specs") do |msg|
channel.basic_consume(queue_name, no_ack: false, tag: "AMQProxy specs") do |msg|
body = msg.body_io.to_s
if body == message_payload
channel.basic_ack(msg.delivery_tag)
num_received_messages += 1
end
end
sleep 0.1
end

num_received_messages.should eq num_messages_to_publish
Expand All @@ -58,8 +59,30 @@ describe AMQProxy::Server do
end
end

it "a client can open all channels" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
max = 4000
AMQP::Client.start("amqp://localhost:5673?channel_max=#{max}") do |conn|
conn.channel_max.should eq max
conn.channel_max.times do
conn.channel
end
s.client_connections.should eq 1
s.upstream_connections.should eq 2
end
sleep 0.1
s.client_connections.should eq 0
s.upstream_connections.should eq 2
ensure
s.stop_accepting_clients
end
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -83,7 +106,7 @@ describe AMQProxy::Server do

it "responds to upstream heartbeats" do
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -102,7 +125,7 @@ describe AMQProxy::Server do
it "supports waiting for client connections on graceful shutdown" do
started = Time.utc.to_unix

s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, 5)
wait_for_channel = Channel(Int32).new # channel used to wait for certain calls, to test certain behaviour
spawn do
s.listen("127.0.0.1", 5673)
Expand Down Expand Up @@ -133,11 +156,11 @@ describe AMQProxy::Server do
end
wait_for_channel.receive.should eq 2 # wait 2
s.client_connections.should eq 2
s.upstream_connections.should eq 2
s.upstream_connections.should eq 1
spawn s.stop_accepting_clients
wait_for_channel.receive.should eq 3 # wait 3
s.client_connections.should eq 1
s.upstream_connections.should eq 2 # since connection stays open
s.upstream_connections.should eq 1 # since connection stays open
spawn do
begin
AMQP::Client.start("amqp://localhost:5673") do |conn|
Expand All @@ -153,7 +176,7 @@ describe AMQProxy::Server do
end
wait_for_channel.receive.should eq 4 # wait 4
s.client_connections.should eq 1 # since the new connection should not have worked
s.upstream_connections.should eq 2 # since connections stay open
s.upstream_connections.should eq 1 # since connections stay open
wait_for_channel.receive.should eq 5 # wait 5
s.client_connections.should eq 0 # since now the server should be closed
s.upstream_connections.should eq 1
Expand Down
30 changes: 0 additions & 30 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,3 @@ require "../src/amqproxy/version"
require "amqp-client"

MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "

# Spec timeout borrowed from Crystal project:
# https://github.com/crystal-lang/crystal/blob/1.10.1/spec/support/mt_abort_timeout.cr

private SPEC_TIMEOUT = 15.seconds

Spec.around_each do |example|
done = Channel(Exception?).new

spawn(same_thread: true) do
begin
example.run
rescue e
done.send(e)
else
done.send(nil)
end
end

timeout = SPEC_TIMEOUT

select
when res = done.receive
raise res if res
when timeout(timeout)
_it = example.example
ex = Spec::AssertionFailed.new("spec timed out after #{timeout}", _it.file, _it.line)
_it.parent.report(:fail, _it.description, _it.file, _it.line, timeout, ex)
end
end
44 changes: 37 additions & 7 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ require "./amqproxy/server"
require "option_parser"
require "uri"
require "ini"
require "logger"
require "log"

class AMQProxy::CLI
@listen_address = ENV["LISTEN_ADDRESS"]? || "localhost"
@listen_port = ENV["LISTEN_PORT"]? || 5673
@log_level : Logger::Severity = Logger::INFO
@log_level : Log::Severity = Log::Severity::Info
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@upstream = ENV["AMQP_URL"]?

Expand All @@ -19,7 +19,7 @@ class AMQProxy::CLI
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
Expand All @@ -29,7 +29,7 @@ class AMQProxy::CLI
case key
when "port" then @listen_port = value
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
else raise "Unsupported config #{name}/#{key}"
end
end
Expand All @@ -50,7 +50,7 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = Logger::DEBUG }
parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug }
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
Expand All @@ -71,15 +71,23 @@ class AMQProxy::CLI
port = u.port || default_port
tls = u.scheme == "amqps"

server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout)
log_backend = if ENV.has_key?("JOURNAL_STREAM")
Log::IOBackend.new(formatter: JournalLogFormat, dispatcher: ::Log::DirectDispatcher)
else
Log::IOBackend.new(formatter: StdoutLogFormat, dispatcher: ::Log::DirectDispatcher)
end
Log.setup_from_env(default_level: @log_level, backend: log_backend)

server = AMQProxy::Server.new(u.host || "", port, tls, @idle_connection_timeout)

first_shutdown = true
shutdown = ->(_s : Signal) do
if first_shutdown
first_shutdown = false
server.stop_accepting_clients
else
server.disconnect_clients
else
server.close_sockets
end
end
Signal::INT.trap &shutdown
Expand All @@ -92,6 +100,28 @@ class AMQProxy::CLI
sleep 0.2
end
end

struct JournalLogFormat < Log::StaticFormatter
def run
source
context(before: '[', after: ']')
string ' '
message
exception
end
end

struct StdoutLogFormat < Log::StaticFormatter
def run
timestamp
severity
source(before: ' ')
context(before: '[', after: ']')
string ' '
message
exception
end
end
end

AMQProxy::CLI.new.run
85 changes: 85 additions & 0 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
require "openssl"
require "log"
require "./records"
require "./upstream"

module AMQProxy
class ChannelPool
Log = ::Log.for(self)
@lock = Mutex.new
@upstreams = Deque(Upstream).new

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32)
spawn shrink_pool_loop, name: "shrink pool loop"
end

def get(downstream_channel : DownstreamChannel) : UpstreamChannel
at_channel_max = 0
@lock.synchronize do
loop do
if upstream = @upstreams.shift?
next if upstream.closed?
begin
upstream_channel = upstream.open_channel_for(downstream_channel)
@upstreams.unshift(upstream)
return upstream_channel
rescue Upstream::ChannelMaxReached
@upstreams.push(upstream)
at_channel_max += 1
add_upstream if at_channel_max == @upstreams.size
end
else
add_upstream
end
end
end
end

private def add_upstream
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
Log.info { "Adding upstream connection" }
@upstreams.unshift upstream
rescue ex : IO::Error
raise Upstream::Error.new ex.message, cause: ex
end

def connections
@upstreams.size
end

def close
Log.info { "Closing all upstream connections" }
@lock.synchronize do
while u = @upstreams.shift?
begin
u.close "AMQProxy shutdown"
rescue ex
Log.error { "Problem closing upstream: #{ex.inspect}" }
end
end
end
end

private def shrink_pool_loop
loop do
sleep @idle_connection_timeout.seconds
@lock.synchronize do
(@upstreams.size - 1).times do # leave at least one connection
u = @upstreams.pop
if u.active_channels.zero?
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Log.error { "Problem closing upstream: #{ex.inspect}" }
end
elsif u.closed?
Log.error { "Removing closed upstream connection from pool" }
else
@upstreams.unshift u
end
end
end
end
end
end
end
Loading

0 comments on commit 54fdfd3

Please sign in to comment.