Skip to content

Commit

Permalink
send connection_information to amqp-client (#613)
Browse files Browse the repository at this point in the history
* send client_information to amqp-client

* refactor start_link

---------

Co-authored-by: Viktor Erlingsson <[email protected]>
  • Loading branch information
kickster97 and viktorerlingsson committed Sep 19, 2024
1 parent 0713d51 commit 2e0ba2a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 72 deletions.
131 changes: 66 additions & 65 deletions src/lavinmq/federation/link.cr
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ module LavinMQ
private abstract def start_link
private abstract def unregister_observer

private def start_link_common(&)
return if @state.terminated?
@upstream_connection.try &.close
@downstream_connection.try &.close
upstream_uri = named_uri(@upstream.uri)
params = upstream_uri.query_params
params["product"] = "LavinMQ"
params["product_version"] = LavinMQ::VERSION.to_s
upstream_uri.query = params.to_s
::AMQP::Client.start(upstream_uri) do |c|
@upstream_connection = c
::AMQP::Client.start(named_uri(@local_uri)) do |p|
@downstream_connection = p
yield c, p
end
end
end

enum State
Starting
Running
Expand Down Expand Up @@ -204,43 +222,34 @@ module LavinMQ
end

private def start_link
return if @state.terminated?
@upstream_connection.try &.close
@downstream_connection.try &.close
upstream_uri = named_uri(@upstream.uri)
local_uri = named_uri(@local_uri)
::AMQP::Client.start(upstream_uri) do |c|
@upstream_connection = c
::AMQP::Client.start(local_uri) do |p|
@downstream_connection = p
cch, q = setup_queue(c)
cch.prefetch(count: @upstream.prefetch)
pch = p.channel
pch.confirm_select if @upstream.ack_mode.on_confirm?
no_ack = @upstream.ack_mode.no_ack?
state(State::Running)
unless @federated_q.immediate_delivery?
@log.debug { "Waiting for consumers" }
select
when @consumer_available.receive?
when timeout(1.second)
return if @upstream_connection.try &.closed?
end
end
q_name = q[:queue_name]
cch.basic_consume(q_name, no_ack: no_ack, tag: @upstream.consumer_tag, block: true) do |msg|
@last_changed = RoughTime.unix_ms
headers, received_from = received_from_header(msg)
received_from << ::AMQP::Client::Arguments.new({
"uri" => @scrubbed_uri,
"queue" => q_name,
"redelivered" => msg.redelivered,
})
headers["x-received-from"] = received_from
msg.properties.headers = headers
federate(msg, pch, cch.not_nil!, EXCHANGE, @federated_q.name)
start_link_common do |c, p|
cch, q = setup_queue(c)
cch.prefetch(count: @upstream.prefetch)
pch = p.channel
pch.confirm_select if @upstream.ack_mode.on_confirm?
no_ack = @upstream.ack_mode.no_ack?
state(State::Running)
unless @federated_q.immediate_delivery?
@log.debug { "Waiting for consumers" }
select
when @consumer_available.receive?
when timeout(1.second)
return if @upstream_connection.try &.closed?
end
end
q_name = q[:queue_name]
cch.basic_consume(q_name, no_ack: no_ack, tag: @upstream.consumer_tag, block: true) do |msg|
@last_changed = RoughTime.unix_ms
headers, received_from = received_from_header(msg)
received_from << ::AMQP::Client::Arguments.new({
"uri" => @scrubbed_uri,
"queue" => q_name,
"redelivered" => msg.redelivered,
})
headers["x-received-from"] = received_from
msg.properties.headers = headers
federate(msg, pch, cch.not_nil!, EXCHANGE, @federated_q.name)
end
end
end
end
Expand Down Expand Up @@ -306,6 +315,8 @@ module LavinMQ
upstream_uri = @upstream.uri.dup
params = upstream_uri.query_params
params["name"] ||= "Federation link cleanup: #{@upstream.name}/#{name}"
params["product"] = "LavinMQ"
params["product_version"] = LavinMQ::VERSION.to_s
upstream_uri.query = params.to_s
::AMQP::Client.start(upstream_uri) do |c|
ch = c.channel
Expand Down Expand Up @@ -352,37 +363,27 @@ module LavinMQ
end

private def start_link
return if @state.terminated?
@upstream_connection.try &.close
@downstream_connection.try &.close
upstream_uri = named_uri(@upstream.uri)
local_uri = named_uri(@local_uri)
::AMQP::Client.start(upstream_uri) do |c|
@upstream_connection = c
::AMQP::Client.start(local_uri) do |p|
@downstream_connection = p
cch, @consumer_q = setup(c)
cch.prefetch(count: @upstream.prefetch)
pch = p.channel
pch.confirm_select if @upstream.ack_mode.on_confirm?
no_ack = @upstream.ack_mode.no_ack?
state(State::Running)

cch.basic_consume(@upstream_q, no_ack: no_ack, tag: @upstream.consumer_tag, block: true) do |msg|
@last_changed = RoughTime.unix_ms
headers, received_from = received_from_header(msg)
received_from << ::AMQP::Client::Arguments.new({
"uri" => @scrubbed_uri,
"exchange" => @upstream_exchange,
"redelivered" => msg.redelivered,
})
headers["x-received-from"] = received_from
msg.properties.headers = headers
federate(msg, pch, cch.not_nil!, @federated_ex.name, msg.routing_key)
end
ensure
@consumer_q = nil
start_link_common do |c, p|
cch, @consumer_q = setup(c)
cch.prefetch(count: @upstream.prefetch)
pch = p.channel
pch.confirm_select if @upstream.ack_mode.on_confirm?
no_ack = @upstream.ack_mode.no_ack?
state(State::Running)
cch.basic_consume(@upstream_q, no_ack: no_ack, tag: @upstream.consumer_tag, block: true) do |msg|
@last_changed = RoughTime.unix_ms
headers, received_from = received_from_header(msg)
received_from << ::AMQP::Client::Arguments.new({
"uri" => @scrubbed_uri,
"exchange" => @upstream_exchange,
"redelivered" => msg.redelivered,
})
headers["x-received-from"] = received_from
msg.properties.headers = headers
federate(msg, pch, cch.not_nil!, @federated_ex.name, msg.routing_key)
end
ensure
@consumer_q = nil
end
end
end
Expand Down
17 changes: 14 additions & 3 deletions static/js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,20 @@ function updateConnection (all) {
const cp = item.client_properties
document.getElementById('cp-name').textContent = cp.connection_name
document.getElementById('cp-capabilities').textContent = DOM.jsonToText(cp.capabilities)
document.getElementById('cp-product').textContent = cp.product
document.getElementById('cp-platform').textContent = cp.platform
document.getElementById('cp-version').textContent = cp.version
if (cp.product_version) {
document.getElementById('cp-product').appendChild(document.createElement('span')).textContent = cp.product
document.getElementById('cp-product').appendChild(document.createElement('br'))
document.getElementById('cp-product').appendChild(document.createElement('small')).textContent = 'Verison: ' + cp.product_version
} else {
document.getElementById('cp-product').textContent = cp.product
}
if (cp.platform_version) {
document.getElementById('cp-platform').appendChild(document.createElement('span')).textContent = cp.platform
document.getElementById('cp-platform').appendChild(document.createElement('br'))
document.getElementById('cp-platform').appendChild(document.createElement('small')).textContent = 'Verison: ' + cp.platform_version
} else {
document.getElementById('cp-platform').textContent = cp.platform
}
const infoEl = document.getElementById('cp-information')
if (cp.information && cp.information.startsWith('http')) {
const infoLink = document.createElement('a')
Expand Down
4 changes: 0 additions & 4 deletions views/connection.ecr
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@
<th>Platform</th>
<td id="cp-platform"></td>
</tr>
<tr>
<th>Version</th>
<td id="cp-version"></td>
</tr>
<tr>
<th>Information</th>
<td id="cp-information"></td>
Expand Down

0 comments on commit 2e0ba2a

Please sign in to comment.