Skip to content

Commit

Permalink
Message deduplication (#854)
Browse files Browse the repository at this point in the history
Implementation for message deduplication on exchanges.

Features

Deduplication on Exchanges
Deduplication cache storage in memory
TTL for each message
Default TTL
Deduplication on queues

Fixes #833
  • Loading branch information
snichme authored Jan 23, 2025
1 parent 75f2cfd commit 74d70a9
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Support for message deduplication on exchanges and queues [854](https://github.com/cloudamqp/lavinmq/pull/854)

## [2.1.0] - 2025-01-16

### Changed
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ For questions or suggestions:
- Replication
- Stream queues
- Automatic leader election in clusters via etcd
- Message deduplication

### Known differences to other AMQP servers

Expand Down
150 changes: 150 additions & 0 deletions spec/deduplication_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "./spec_helper"
require "../src/lavinmq/deduplication.cr"

describe LavinMQ::Deduplication do
describe LavinMQ::Deduplication::MemoryCache do
it "should have a max size" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(2)
cache.insert("item1")
cache.insert("item2")
cache.insert("item3")
cache.contains?("item1").should be_false
cache.contains?("item2").should be_true
cache.contains?("item3").should be_true
end

it "should store item without ttl" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(10)
cache.insert("item1")
cache.contains?("item1").should be_true
cache.contains?("item2").should be_false
end

it "should respect ttl" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(3)
cache.insert("item1", 1)
cache.insert("item2", 300)
cache.insert("item3")
sleep 0.2.seconds
cache.contains?("item1").should be_false
cache.contains?("item2").should be_true
cache.contains?("item3").should be_true
end
end
end

class MockCache < LavinMQ::Deduplication::Cache(AMQ::Protocol::Field)
@counter = Hash(String, Array({String, UInt32?})).new do |h, k|
h[k] = Array({String, UInt32?}).new
end

def contains?(key) : Bool
@counter["contains?"] << {key.as(String), nil}
false
end

def insert(key, ttl = nil)
@counter["insert"] << {key.as(String), ttl}
end

def calls(key : String)
@counter[key]
end
end

describe LavinMQ::Deduplication::Deduper do
describe "duplicate?" do
it "should return false if \"x-deduplication-header\" is missing (no identifier, always unique)" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new
msg = LavinMQ::Message.new("ex", "rk", "body", props)
res = deduper.duplicate?(msg)
res.should be_false
end

it "should check cache if entry exists" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.duplicate?(msg)
mock.calls("contains?").size.should eq 1
end

it "should only insert into cache if header has a value" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new)
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
mock.calls("insert").size.should eq 0
end

it "should only insert into cache if header has a value" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
mock.calls("insert").size.should eq 1
end

it "should respect x-cache-ttl on message" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
"x-cache-ttl" => 10,
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end

it "should fallback to default ttl" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 12
end

it "should prio message ttl over default ttl" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
"x-cache-ttl" => 10,
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end
it "should allow checking any header for dedups" do
mock = MockCache.new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 10, "custom")
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"custom" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end
end
end
60 changes: 60 additions & 0 deletions spec/exchange_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,64 @@ describe LavinMQ::Exchange do
end
end
end
describe "message deduplication" do
it "should handle message deduplication" do
with_amqp_server do |s|
with_channel(s) do |ch|
args = AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
})
ch.exchange("test", "topic", args: args)
ch.queue.bind("test", "#")
ex = s.vhosts["/"].exchanges["test"]
q = s.vhosts["/"].queues.first_value
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 1
ex.dedup_count.should eq 0
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 0
ex.dedup_count.should eq 1

q.message_count.should eq 1
end
end
end

it "should handle message deduplication, on custom header" do
with_amqp_server do |s|
with_channel(s) do |ch|
args = AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
"x-deduplication-header" => "custom",
})
ch.exchange("test", "topic", args: args)
ch.queue.bind("test", "#")
ex = s.vhosts["/"].exchanges["test"]
q = s.vhosts["/"].queues.first_value
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"custom" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 1
ex.dedup_count.should eq 0
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"custom" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 0
ex.dedup_count.should eq 1

q.message_count.should eq 1
end
end
end
end
end
21 changes: 21 additions & 0 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,25 @@ describe LavinMQ::AMQP::Queue do
FileUtils.rm_rf tmpdir if tmpdir
end
end

describe "deduplication" do
it "should not except message if it's a duplicate" do
with_amqp_server do |s|
with_channel(s) do |ch|
queue_name = "dedup-queue"
q1 = ch.queue(queue_name, args: AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
}))
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
ch.default_exchange.publish_confirm("body", queue_name, props: props)
ch.default_exchange.publish_confirm("body", queue_name, props: props)
q1.get(no_ack: false).not_nil!.body_io.to_s.should eq "body"
q1.get(no_ack: false).should be_nil
end
end
end
end
end
26 changes: 24 additions & 2 deletions src/lavinmq/amqp/exchange/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ module LavinMQ
@alternate_exchange : String?
@delayed_queue : Queue?
@deleted = false
@deduper : Deduplication::Deduper?

rate_stats({"publish_in", "publish_out", "unroutable"})
property publish_in_count, publish_out_count, unroutable_count
rate_stats({"publish_in", "publish_out", "unroutable", "dedup"})
property publish_in_count, publish_out_count, unroutable_count, dedup_count

def initialize(@vhost : VHost, @name : String, @durable = false,
@auto_delete = false, @internal = false,
Expand Down Expand Up @@ -70,6 +71,20 @@ module LavinMQ
@delayed = true
init_delayed_queue
end
if @arguments["x-message-deduplication"]?.try(&.as?(Bool))
ttl = parse_header("x-cache-ttl", Int).try(&.to_u32)
size = parse_header("x-cache-size", Int).try(&.to_u32)
raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size
header_key = parse_header("x-deduplication-header", String)
cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size)
@deduper = Deduplication::Deduper.new(cache, ttl, header_key)
end
end

private macro parse_header(header, type)
if value = @arguments["{{ header.id }}"]?
value.as?({{ type }}) || raise LavinMQ::Error::PreconditionFailed.new("{{ header.id }} header not a {{ type.id }}")
end
end

def details_tuple
Expand Down Expand Up @@ -146,6 +161,13 @@ module LavinMQ
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Int32
@publish_in_count += 1
if d = @deduper
if d.duplicate?(msg)
@dedup_count += 1
return 0
end
d.add(msg)
end
count = do_publish(msg, immediate, queues, exchanges)
@unroutable_count += 1 if count.zero?
@publish_out_count += count
Expand Down
19 changes: 18 additions & 1 deletion src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require "./state"
require "./event"
require "./message_store"
require "../../unacked_message"
require "../../deduplication"

module LavinMQ::AMQP
class Queue < LavinMQ::Queue
Expand Down Expand Up @@ -111,7 +112,7 @@ module LavinMQ::AMQP

# Creates @[x]_count and @[x]_rate and @[y]_log
rate_stats(
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"},
{"message_count", "unacked_count"})

getter name, arguments, vhost, consumers, last_get_time
Expand All @@ -127,6 +128,7 @@ module LavinMQ::AMQP
@data_dir : String
Log = LavinMQ::Log.for "queue"
@metadata : ::Log::Metadata
@deduper : Deduplication::Deduper?

def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
Expand Down Expand Up @@ -271,6 +273,14 @@ module LavinMQ::AMQP
@single_active_consumer_queue = parse_header("x-single-active-consumer", Bool) == true
@consumer_timeout = parse_header("x-consumer-timeout", Int).try &.to_u64
validate_positive("x-consumer-timeout", @consumer_timeout)
if parse_header("x-message-deduplication", Bool)
size = parse_header("x-cache-size", Int).try(&.to_u32)
raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size
ttl = parse_header("x-cache-ttl", Int).try(&.to_u32)
header_key = parse_header("x-deduplication-header", String)
cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size)
@deduper = Deduplication::Deduper.new(cache, ttl, header_key)
end
end

private macro parse_header(header, type)
Expand Down Expand Up @@ -421,6 +431,13 @@ module LavinMQ::AMQP

def publish(msg : Message) : Bool
return false if @deleted || @state.closed?
if d = @deduper
if d.duplicate?(msg)
@dedup_count += 1
return false
end
d.add(msg)
end
reject_on_overflow(msg)
@msg_store_lock.synchronize do
@msg_store.push(msg)
Expand Down
Loading

0 comments on commit 74d70a9

Please sign in to comment.