Skip to content

Commit

Permalink
Bugfix: federation binds to default exchange (backport #749) (#763)
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun authored Aug 26, 2024
1 parent 55a8585 commit a817039
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Exchange federation tried to bind to upstream's default exchange
- Shovel ack all unacked messages on stop
- Yield at end of while loop in Queue#drop_overflow to avoid holding the fiber for too long [#725](https://github.com/cloudamqp/lavinmq/pull/725)

Expand Down
16 changes: 16 additions & 0 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,22 @@ describe LavinMQ::Federation::Upstream do
end
end

{% for descr, v in {nil: nil, empty: ""} %}
describe "when @exchange is {{descr}}" do
it "should use downstream exchange name as upstream exchange" do
with_amqp_server do |s|
vhost = s.vhosts["/"]

vhost.declare_exchange("ex1", "topic", true, false)

upstream = LavinMQ::Federation::Upstream.new(vhost, "test", "amqp://", {{v}})
link1 = upstream.link(vhost.exchanges["ex1"])

link1.@upstream_exchange.should eq "ex1"
end
end
end
{% end %}
describe "when @queue is nil" do
describe "#link(Queue)" do
it "should create link that consumes upstream queue with same name as downstream queue" do
Expand Down
5 changes: 4 additions & 1 deletion src/lavinmq/federation/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ module LavinMQ
if link = @ex_links[federated_exchange.name]?
return link
end
upstream_exchange = @exchange ||= federated_exchange.name
upstream_exchange = @exchange
if upstream_exchange.nil? || upstream_exchange.empty?
upstream_exchange = federated_exchange.name
end
upstream_q = "federation: #{upstream_exchange} -> #{System.hostname}:#{vhost.name}:#{federated_exchange.name}"
link = ExchangeLink.new(self, federated_exchange, upstream_q, upstream_exchange)
@ex_links[federated_exchange.name] = link
Expand Down

0 comments on commit a817039

Please sign in to comment.