Skip to content

Commit

Permalink
refactor specs
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Apr 2, 2024
1 parent 84ae73e commit fb24e65
Showing 1 changed file with 23 additions and 29 deletions.
52 changes: 23 additions & 29 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -253,51 +253,46 @@ describe LavinMQ::Federation::Upstream do
end

it "should not transfer messages unless downstream has consumer" do
downstream_queue_name = Random::Secure.hex(10)
upstream_queue_name = Random::Secure.hex(10)
upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(Random::Secure.hex(10), nil, upstream_queue_name)
ds_queue_name = Random::Secure.hex(10)
us_queue_name = Random::Secure.hex(10)
upstream, us_vhost, ds_vhost = UpstreamSpecHelpers.setup_federation(Random::Secure.hex(10), nil, us_queue_name)

UpstreamSpecHelpers.start_link(upstream, downstream_queue_name, "queues")
Server.users.add_permission("guest", upstream_vhost.name, /.*/, /.*/, /.*/)
Server.users.add_permission("guest", downstream_vhost.name, /.*/, /.*/, /.*/)
UpstreamSpecHelpers.start_link(upstream, ds_queue_name, "queues")
Server.users.add_permission("guest", us_vhost.name, /.*/, /.*/, /.*/)
Server.users.add_permission("guest", ds_vhost.name, /.*/, /.*/, /.*/)

with_channel(vhost: upstream_vhost.name) do |upstream_ch|
upstream_q = upstream_ch.queue(upstream_queue_name)
# publish 1 message
with_channel(vhost: us_vhost.name) do |upstream_ch|
upstream_q = upstream_ch.queue(us_queue_name)
upstream_q.publish "msg1"
wait_for { Server.vhosts[upstream_vhost.name].queues[upstream_queue_name].message_count == 1 }
end

with_channel(vhost: downstream_vhost.name) do |downstream_ch|
msgs = Channel(String).new
downstream_q = downstream_ch.queue(downstream_queue_name)
wait_for { downstream_vhost.queues[downstream_queue_name].policy.try(&.name) == "FE" }

# Assert setup is correct
# consume 1 message
with_channel(vhost: ds_vhost.name) do |downstream_ch|
downstream_q = downstream_ch.queue(ds_queue_name)
wait_for { ds_vhost.queues[ds_queue_name].policy.try(&.name) == "FE" }
wait_for { upstream.links.first?.try &.state.running? }

msgs = Channel(String).new
downstream_q.subscribe do |msg|
msgs.send msg.body_io.to_s
end
msgs.receive.should eq "msg1"
wait_for { Server.vhosts[downstream_vhost.name].queues[downstream_queue_name].message_count == 0 }
wait_for { Server.vhosts[ds_vhost.name].queues[ds_queue_name].message_count == 0 }
end

queue_up = Server.vhosts[upstream_vhost.name].queues[upstream_queue_name]
queue_down = Server.vhosts[downstream_vhost.name].queues[downstream_queue_name]
wait_for { queue_down.consumers.empty? }
wait_for { Server.vhosts[ds_vhost.name].queues[ds_queue_name].consumers.empty? }

with_channel(vhost: upstream_vhost.name) do |upstream_ch|
upstream_q = upstream_ch.queue(upstream_queue_name)
# publish another message
with_channel(vhost: us_vhost.name) do |upstream_ch|
upstream_q = upstream_ch.queue(us_queue_name)
upstream_q.publish "msg2"
wait_for { queue_up.message_count == 1 }
wait_for { queue_down.message_count == 0 }
end

# resume consuming on downstream, both upstream and downstream should be empty
with_channel(vhost: downstream_vhost.name) do |downstream_ch3|
# resume consuming on downstream, should get 1 message
with_channel(vhost: ds_vhost.name) do |downstream_ch|
msgs = Channel(String).new
downstream_q2 = downstream_ch3.queue(downstream_queue_name)
downstream_q2.subscribe do |msg|
downstream_ch.queue(ds_queue_name).subscribe do |msg|
msgs.send msg.body_io.to_s
end
msgs.receive.should eq "msg2"
Expand Down Expand Up @@ -347,9 +342,8 @@ describe LavinMQ::Federation::Upstream do

# resume consuming on downstream, should get 1 message
with_channel(vhost: ds_vhost.name) do |downstream_ch|
downstream_q = downstream_ch.queue(ds_queue_name)
messages_consumed = 0
downstream_q.subscribe(tag: "c2") do |_msg|
downstream_ch.queue(ds_queue_name).subscribe(tag: "c2") do |_msg|
messages_consumed += 1
end
wait_for { Server.vhosts[us_vhost.name].queues[us_queue_name].message_count == 0 }
Expand Down

0 comments on commit fb24e65

Please sign in to comment.