Skip to content

Commit

Permalink
send paparms in url
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Jan 22, 2024
1 parent 6d866ec commit a6878da
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module LavinMQ
property log_file : String? = nil
property log_level : Log::Severity = DEFAULT_LOG_LEVEL
property amqp_bind = "127.0.0.1"
property amqp_port = 5673
property amqp_port = 5672
property amqps_port = -1
property unix_path = ""
property unix_proxy_protocol = 1_u8 # PROXY protocol version on unix domain socket connections
Expand Down
23 changes: 15 additions & 8 deletions src/lavinmq/federation/link.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ module LavinMQ
@last_unacked : UInt64?
@upstream_connection : ::AMQP::Client::Connection?
@downstream_connection : ::AMQP::Client::Connection?
@connection_intformation : ::AMQP::Client::ConnectionInformation?

def initialize(@upstream : Upstream, @log : Log)
user = @upstream.vhost.users.direct_user
Expand All @@ -29,8 +28,6 @@ module LavinMQ
uri = @upstream.uri
ui = uri.userinfo
@scrubbed_uri = ui.nil? ? uri.to_s : uri.to_s.sub("#{ui}@", "")
@connection_information = ::AMQP::Client::ConnectionInformation.new(
product: "LavinMQ", platform: "Crystal", product_version: LavinMQ::VERSION, platform_version: Crystal::VERSION)
end

def details_tuple
Expand Down Expand Up @@ -212,9 +209,13 @@ module LavinMQ
@downstream_connection.try &.close
upstream_uri = named_uri(@upstream.uri)
local_uri = named_uri(@local_uri)
::AMQP::Client.start(upstream_uri, @connection_information) do |c|
params = upstream_uri.query_params
params["product"] = "LavinMQ"
params["product_version"] = LavinMQ::VERSION.to_s
upstream_uri.query = params.to_s
::AMQP::Client.start(upstream_uri) do |c|
@upstream_connection = c
::AMQP::Client.start(local_uri, @connection_information) do |p|
::AMQP::Client.start(local_uri) do |p|
@downstream_connection = p
cch, q = setup_queue(c)
cch.prefetch(count: @upstream.prefetch)
Expand Down Expand Up @@ -305,8 +306,10 @@ module LavinMQ
upstream_uri = @upstream.uri.dup
params = upstream_uri.query_params
params["name"] ||= "Federation link cleanup: #{@upstream.name}/#{name}"
params["product"] = "LavinMQ"
params["product_version"] = LavinMQ::VERSION.to_s
upstream_uri.query = params.to_s
::AMQP::Client.start(upstream_uri, @connection_information) do |c|
::AMQP::Client.start(upstream_uri) do |c|
ch = c.channel
ch.queue_delete(@upstream_q)
end
Expand Down Expand Up @@ -356,9 +359,13 @@ module LavinMQ
@downstream_connection.try &.close
upstream_uri = named_uri(@upstream.uri)
local_uri = named_uri(@local_uri)
::AMQP::Client.start(upstream_uri, @connection_information) do |c|
params = upstream_uri.query_params
params["product"] = "LavinMQ"
params["product_version"] = LavinMQ::VERSION.to_s
upstream_uri.query = params.to_s
::AMQP::Client.start(upstream_uri) do |c|
@upstream_connection = c
::AMQP::Client.start(local_uri, @connection_information) do |p|
::AMQP::Client.start(local_uri) do |p|
@downstream_connection = p
cch, @consumer_q = setup(c)
cch.prefetch(count: @upstream.prefetch)
Expand Down

0 comments on commit a6878da

Please sign in to comment.