Skip to content

Commit

Permalink
fixup! Channel pool
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Dec 11, 2023
1 parent 44f04e9 commit 01310d5
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 120 deletions.
6 changes: 3 additions & 3 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ shards:
version: 1.5.0

amq-protocol: # Overridden
path: ../amq-protocol.cr
version: 1.1.4
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.11+git.commit.eddbea6ddb19be0b1750c2760aa5c5ab0eeafed8

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.0.11
version: 1.1.0

logger:
git: https://github.com/84codes/logger.cr.git
Expand Down
5 changes: 3 additions & 2 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ describe AMQProxy::Server do
queue = channel.queue(queue_name)
queue.publish_confirm(message_payload)
end
sleep 0.1
end
sleep 0.1

AMQP::Client.start("amqp://localhost:5673") do |conn|
channel = conn.channel
channel.basic_consume(queue_name, block: true, tag: "AMQProxy specs") do |msg|
channel.basic_consume(queue_name, no_ack: false, tag: "AMQProxy specs") do |msg|
body = msg.body_io.to_s
if body == message_payload
channel.basic_ack(msg.delivery_tag)
num_received_messages += 1
end
end
sleep 0.1
end

num_received_messages.should eq num_messages_to_publish
Expand Down
2 changes: 1 addition & 1 deletion src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module AMQProxy
def initialize(@host : String, @port : Int32, tls : Bool, @log, @idle_connection_timeout : Int32)
@tls_ctx = OpenSSL::SSL::Context::Client.new if tls
@upstream_channel_channel = Hash(Credentials, Channel(Tuple(Upstream, UInt16))).new do |h, k|
chan = Channel(Tuple(Upstream, UInt16)).new(128)
chan = Channel(Tuple(Upstream, UInt16)).new(8)
spawn pool_loop(k, chan)
h[k] = chan
end
Expand Down
4 changes: 3 additions & 1 deletion src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ module AMQProxy

break if frame.is_a? AMQ::Protocol::Frame::Connection::CloseOk
end
rescue ex : IO::Error
raise ex unless socket.closed?
ensure
close_socket
end
Expand Down Expand Up @@ -181,7 +183,7 @@ module AMQProxy
open_ok.to_io(socket, IO::ByteFormat::NetworkEndian)
socket.flush

{tune_ok, Credentials.new(vhost, user, password)}
{tune_ok, Credentials.new(user, password, vhost)}
rescue ex
raise NegotiationError.new "Client negotiation failed", ex
end
Expand Down
90 changes: 0 additions & 90 deletions src/amqproxy/pool.cr

This file was deleted.

28 changes: 6 additions & 22 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ require "./client"
module AMQProxy
class Upstream
property last_used = Time.monotonic
setter current_client : Client?
@socket : IO
@open_channels = Set(UInt16).new
@unsafe_channels = Set(UInt16).new
@channel_map = Hash(UInt16, Tuple(Client, UInt16)).new
@channels = Hash(UInt16, Tuple(Client, UInt16)?).new
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
@log : Logger
Expand All @@ -35,9 +34,6 @@ module AMQProxy
spawn read_loop, name: "upstream read loop #{@host}:#{@port}"
end

@channels = Hash(UInt16, Tuple(Client, UInt16)?).new
@channels_lock = Mutex.new

def open_channel : UInt16
@channels_lock.synchronize do
1_u16.upto(@channel_max) do |i|
Expand All @@ -60,6 +56,7 @@ module AMQProxy
def unassign_channel(channel : UInt16)
@channels_lock.synchronize do
if @unsafe_channels.includes? channel
close_channel(channel)
else
@channels[channel] = nil
end
Expand All @@ -72,6 +69,7 @@ module AMQProxy
IO::ByteFormat::NetworkEndian
@socket.flush
@channels.delete channel
@unsafe_channels.delete channel
end
end

Expand All @@ -86,7 +84,7 @@ module AMQProxy
return
when AMQ::Protocol::Frame::Connection::CloseOk then return
when AMQ::Protocol::Frame::Channel::OpenOk
when AMQ::Protocol::Frame::Channel::Close # when server requested a channel close
when AMQ::Protocol::Frame::Channel::Close # when upstream server requested a channel close
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
if downstream = @channels.delete(frame.channel)
Expand All @@ -96,7 +94,7 @@ module AMQProxy
end
when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close
else
if client_channel = @channel_map[frame.channel]?
if client_channel = @channels[frame.channel]?
client, frame.channel = client_channel
client.write(frame)
else
Expand Down Expand Up @@ -164,20 +162,6 @@ module AMQProxy
@socket.closed?
end

def client_disconnected
@current_client = nil
return if closed?
@lock.synchronize do
@open_channels.each do |ch|
if @unsafe_channels.includes? ch
close = AMQ::Protocol::Frame::Channel::Close.new(ch, 200_u16, "Client disconnected", 0_u16, 0_u16)
close.to_io @socket, IO::ByteFormat::NetworkEndian
@socket.flush
end
end
end
end

private def start(credentials) : UInt16
@socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice
@socket.flush
Expand Down
2 changes: 1 addition & 1 deletion src/amqproxy/version.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module AMQProxy
VERSION = {{ `shards version`.stringify }}
VERSION = {{ `shards version`.stringify.chomp }}
end

0 comments on commit 01310d5

Please sign in to comment.