diff --git a/CHANGELOG.md b/CHANGELOG.md
index 83fa39a2bb..f40835bfa0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
+- Support for message deduplication on exchanges and queues [854](https://github.com/cloudamqp/lavinmq/pull/854)
+
## [2.1.0] - 2025-01-16
### Changed
diff --git a/README.md b/README.md
index 1a12920a17..f0f5269f68 100644
--- a/README.md
+++ b/README.md
@@ -226,6 +226,7 @@ For questions or suggestions:
- Replication
- Stream queues
- Automatic leader election in clusters via etcd
+- Message deduplication
### Known differences to other AMQP servers
diff --git a/spec/deduplication_spec.cr b/spec/deduplication_spec.cr
new file mode 100644
index 0000000000..0c4769de69
--- /dev/null
+++ b/spec/deduplication_spec.cr
@@ -0,0 +1,150 @@
+require "./spec_helper"
+require "../src/lavinmq/deduplication.cr"
+
+describe LavinMQ::Deduplication do
+ describe LavinMQ::Deduplication::MemoryCache do
+ it "should have a max size" do
+ cache = LavinMQ::Deduplication::MemoryCache(String).new(2)
+ cache.insert("item1")
+ cache.insert("item2")
+ cache.insert("item3")
+ cache.contains?("item1").should be_false
+ cache.contains?("item2").should be_true
+ cache.contains?("item3").should be_true
+ end
+
+ it "should store item without ttl" do
+ cache = LavinMQ::Deduplication::MemoryCache(String).new(10)
+ cache.insert("item1")
+ cache.contains?("item1").should be_true
+ cache.contains?("item2").should be_false
+ end
+
+ it "should respect ttl" do
+ cache = LavinMQ::Deduplication::MemoryCache(String).new(3)
+ cache.insert("item1", 1)
+ cache.insert("item2", 300)
+ cache.insert("item3")
+ sleep 0.2.seconds
+ cache.contains?("item1").should be_false
+ cache.contains?("item2").should be_true
+ cache.contains?("item3").should be_true
+ end
+ end
+end
+
+class MockCache < LavinMQ::Deduplication::Cache(AMQ::Protocol::Field)
+ @counter = Hash(String, Array({String, UInt32?})).new do |h, k|
+ h[k] = Array({String, UInt32?}).new
+ end
+
+ def contains?(key) : Bool
+ @counter["contains?"] << {key.as(String), nil}
+ false
+ end
+
+ def insert(key, ttl = nil)
+ @counter["insert"] << {key.as(String), ttl}
+ end
+
+ def calls(key : String)
+ @counter[key]
+ end
+end
+
+describe LavinMQ::Deduplication::Deduper do
+ describe "duplicate?" do
+ it "should return false if \"x-deduplication-header\" is missing (no identifier, always unique)" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock)
+ props = LavinMQ::AMQP::Properties.new
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ res = deduper.duplicate?(msg)
+ res.should be_false
+ end
+
+ it "should check cache if entry exists" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock)
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.duplicate?(msg)
+ mock.calls("contains?").size.should eq 1
+ end
+
+ it "should only insert into cache if header has a value" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock)
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new)
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.add(msg)
+ mock.calls("insert").size.should eq 0
+ end
+
+ it "should only insert into cache if header has a value" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock)
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.add(msg)
+ mock.calls("insert").size.should eq 1
+ end
+
+ it "should respect x-cache-ttl on message" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock)
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ "x-cache-ttl" => 10,
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.add(msg)
+ calls = mock.calls("insert")
+ calls.first[0].should eq "msg1"
+ calls.first[1].should eq 10
+ end
+
+ it "should fallback to default ttl" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.add(msg)
+ calls = mock.calls("insert")
+ calls.first[0].should eq "msg1"
+ calls.first[1].should eq 12
+ end
+
+ it "should prio message ttl over default ttl" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ "x-cache-ttl" => 10,
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.add(msg)
+ calls = mock.calls("insert")
+ calls.first[0].should eq "msg1"
+ calls.first[1].should eq 10
+ end
+ it "should allow checking any header for dedups" do
+ mock = MockCache.new
+ deduper = LavinMQ::Deduplication::Deduper.new(mock, 10, "custom")
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "custom" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ deduper.add(msg)
+ calls = mock.calls("insert")
+ calls.first[0].should eq "msg1"
+ calls.first[1].should eq 10
+ end
+ end
+end
diff --git a/spec/exchange_spec.cr b/spec/exchange_spec.cr
index 78cce29459..9772662040 100644
--- a/spec/exchange_spec.cr
+++ b/spec/exchange_spec.cr
@@ -170,4 +170,64 @@ describe LavinMQ::Exchange do
end
end
end
+ describe "message deduplication" do
+ it "should handle message deduplication" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ args = AMQP::Client::Arguments.new({
+ "x-message-deduplication" => true,
+ "x-cache-size" => 10,
+ })
+ ch.exchange("test", "topic", args: args)
+ ch.queue.bind("test", "#")
+ ex = s.vhosts["/"].exchanges["test"]
+ q = s.vhosts["/"].queues.first_value
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ ex.publish(msg, false).should eq 1
+ ex.dedup_count.should eq 0
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ ex.publish(msg, false).should eq 0
+ ex.dedup_count.should eq 1
+
+ q.message_count.should eq 1
+ end
+ end
+ end
+
+ it "should handle message deduplication, on custom header" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ args = AMQP::Client::Arguments.new({
+ "x-message-deduplication" => true,
+ "x-cache-size" => 10,
+ "x-deduplication-header" => "custom",
+ })
+ ch.exchange("test", "topic", args: args)
+ ch.queue.bind("test", "#")
+ ex = s.vhosts["/"].exchanges["test"]
+ q = s.vhosts["/"].queues.first_value
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "custom" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ ex.publish(msg, false).should eq 1
+ ex.dedup_count.should eq 0
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "custom" => "msg1",
+ }))
+ msg = LavinMQ::Message.new("ex", "rk", "body", props)
+ ex.publish(msg, false).should eq 0
+ ex.dedup_count.should eq 1
+
+ q.message_count.should eq 1
+ end
+ end
+ end
+ end
end
diff --git a/spec/queue_spec.cr b/spec/queue_spec.cr
index 4a8860f60c..96fdea40d0 100644
--- a/spec/queue_spec.cr
+++ b/spec/queue_spec.cr
@@ -424,4 +424,25 @@ describe LavinMQ::AMQP::Queue do
FileUtils.rm_rf tmpdir if tmpdir
end
end
+
+ describe "deduplication" do
+ it "should not except message if it's a duplicate" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ queue_name = "dedup-queue"
+ q1 = ch.queue(queue_name, args: AMQP::Client::Arguments.new({
+ "x-message-deduplication" => true,
+ "x-cache-size" => 10,
+ }))
+ props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
+ "x-deduplication-header" => "msg1",
+ }))
+ ch.default_exchange.publish_confirm("body", queue_name, props: props)
+ ch.default_exchange.publish_confirm("body", queue_name, props: props)
+ q1.get(no_ack: false).not_nil!.body_io.to_s.should eq "body"
+ q1.get(no_ack: false).should be_nil
+ end
+ end
+ end
+ end
end
diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr
index f8e8685496..6cbe59f388 100644
--- a/src/lavinmq/amqp/exchange/exchange.cr
+++ b/src/lavinmq/amqp/exchange/exchange.cr
@@ -28,9 +28,10 @@ module LavinMQ
@alternate_exchange : String?
@delayed_queue : Queue?
@deleted = false
+ @deduper : Deduplication::Deduper?
- rate_stats({"publish_in", "publish_out", "unroutable"})
- property publish_in_count, publish_out_count, unroutable_count
+ rate_stats({"publish_in", "publish_out", "unroutable", "dedup"})
+ property publish_in_count, publish_out_count, unroutable_count, dedup_count
def initialize(@vhost : VHost, @name : String, @durable = false,
@auto_delete = false, @internal = false,
@@ -70,6 +71,20 @@ module LavinMQ
@delayed = true
init_delayed_queue
end
+ if @arguments["x-message-deduplication"]?.try(&.as?(Bool))
+ ttl = parse_header("x-cache-ttl", Int).try(&.to_u32)
+ size = parse_header("x-cache-size", Int).try(&.to_u32)
+ raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size
+ header_key = parse_header("x-deduplication-header", String)
+ cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size)
+ @deduper = Deduplication::Deduper.new(cache, ttl, header_key)
+ end
+ end
+
+ private macro parse_header(header, type)
+ if value = @arguments["{{ header.id }}"]?
+ value.as?({{ type }}) || raise LavinMQ::Error::PreconditionFailed.new("{{ header.id }} header not a {{ type.id }}")
+ end
end
def details_tuple
@@ -146,6 +161,13 @@ module LavinMQ
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Int32
@publish_in_count += 1
+ if d = @deduper
+ if d.duplicate?(msg)
+ @dedup_count += 1
+ return 0
+ end
+ d.add(msg)
+ end
count = do_publish(msg, immediate, queues, exchanges)
@unroutable_count += 1 if count.zero?
@publish_out_count += count
diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr
index 67e2382d2a..2f826d0e2e 100644
--- a/src/lavinmq/amqp/queue/queue.cr
+++ b/src/lavinmq/amqp/queue/queue.cr
@@ -13,6 +13,7 @@ require "./state"
require "./event"
require "./message_store"
require "../../unacked_message"
+require "../../deduplication"
module LavinMQ::AMQP
class Queue < LavinMQ::Queue
@@ -111,7 +112,7 @@ module LavinMQ::AMQP
# Creates @[x]_count and @[x]_rate and @[y]_log
rate_stats(
- {"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
+ {"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"},
{"message_count", "unacked_count"})
getter name, arguments, vhost, consumers, last_get_time
@@ -127,6 +128,7 @@ module LavinMQ::AMQP
@data_dir : String
Log = LavinMQ::Log.for "queue"
@metadata : ::Log::Metadata
+ @deduper : Deduplication::Deduper?
def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
@@ -271,6 +273,14 @@ module LavinMQ::AMQP
@single_active_consumer_queue = parse_header("x-single-active-consumer", Bool) == true
@consumer_timeout = parse_header("x-consumer-timeout", Int).try &.to_u64
validate_positive("x-consumer-timeout", @consumer_timeout)
+ if parse_header("x-message-deduplication", Bool)
+ size = parse_header("x-cache-size", Int).try(&.to_u32)
+ raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size
+ ttl = parse_header("x-cache-ttl", Int).try(&.to_u32)
+ header_key = parse_header("x-deduplication-header", String)
+ cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size)
+ @deduper = Deduplication::Deduper.new(cache, ttl, header_key)
+ end
end
private macro parse_header(header, type)
@@ -421,6 +431,13 @@ module LavinMQ::AMQP
def publish(msg : Message) : Bool
return false if @deleted || @state.closed?
+ if d = @deduper
+ if d.duplicate?(msg)
+ @dedup_count += 1
+ return false
+ end
+ d.add(msg)
+ end
reject_on_overflow(msg)
@msg_store_lock.synchronize do
@msg_store.push(msg)
diff --git a/src/lavinmq/deduplication.cr b/src/lavinmq/deduplication.cr
new file mode 100644
index 0000000000..fd3cd0d49f
--- /dev/null
+++ b/src/lavinmq/deduplication.cr
@@ -0,0 +1,72 @@
+module LavinMQ
+ module Deduplication
+ abstract class Cache(T)
+ abstract def contains?(key : T) : Bool
+ abstract def insert(key : T, ttl : UInt32?)
+ end
+
+ class MemoryCache(T) < Cache(T)
+ def initialize(@size : UInt32)
+ @lock = Mutex.new
+ @store = Hash(T, Time::Span?).new(initial_capacity: @size)
+ end
+
+ def contains?(key : T) : Bool
+ @lock.synchronize do
+ return false unless @store.has_key?(key)
+ ttd = @store[key]
+ return true unless ttd
+ return true if ttd > RoughTime.monotonic
+ @store.delete(key)
+ false
+ end
+ end
+
+ def insert(key : T, ttl : UInt32? = nil)
+ @lock.synchronize do
+ @store.shift if @store.size >= @size
+ val = ttl.try { |v| RoughTime.monotonic + v.milliseconds }
+ @store[key] = val
+ end
+ end
+ end
+
+ class Deduper
+ DEFAULT_HEADER_KEY = "x-deduplication-header"
+
+ def initialize(@cache : Cache(AMQ::Protocol::Field), @default_ttl : UInt32? = nil,
+ @header_key : String? = nil)
+ end
+
+ def add(msg : Message)
+ key = dedup_key(msg)
+ return unless key
+ @cache.insert(key, dedup_ttl(msg))
+ end
+
+ def duplicate?(msg : Message) : Bool
+ key = dedup_key(msg)
+ return false unless key
+ @cache.contains?(key)
+ end
+
+ private def dedup_key(msg)
+ headers = msg.properties.headers
+ return unless headers
+ key = @header_key || DEFAULT_HEADER_KEY
+ headers[key]?
+ end
+
+ private def dedup_ttl(msg) : UInt32?
+ headers = msg.properties.headers
+ def_ttl = @default_ttl
+ return def_ttl unless headers
+ value = headers["x-cache-ttl"]?
+ return def_ttl unless value
+ value = value.try(&.as?(Int32))
+ return def_ttl unless value
+ value.to_u32 || def_ttl
+ end
+ end
+ end
+end
diff --git a/src/lavinmq/http/controller/prometheus.cr b/src/lavinmq/http/controller/prometheus.cr
index fb4a85a4cd..e8c1276595 100644
--- a/src/lavinmq/http/controller/prometheus.cr
+++ b/src/lavinmq/http/controller/prometheus.cr
@@ -11,7 +11,8 @@ module LavinMQ
NamedTuple(name: String) |
NamedTuple(channel: String) |
NamedTuple(id: String) |
- NamedTuple(queue: String, vhost: String)
+ NamedTuple(queue: String, vhost: String) |
+ NamedTuple(exchange: String, vhost: String)
alias Metric = NamedTuple(name: String, value: MetricValue) |
NamedTuple(name: String, value: MetricValue, labels: MetricLabels) |
NamedTuple(name: String, value: MetricValue, help: String) |
@@ -62,6 +63,7 @@ module LavinMQ
vhosts.to_a
end
+ # ameba:disable Metrics/CyclomaticComplexity
private def register_routes
get "/metrics" do |context, _|
context.response.content_type = "text/plain"
@@ -97,6 +99,8 @@ module LavinMQ
detailed_connection_coarse_metrics(vhosts, writer)
when "channel_metrics"
detailed_channel_metrics(vhosts, writer)
+ when "exchange_metrics"
+ detailed_exchange_metrics(vhosts, writer)
end
end
end
@@ -347,6 +351,11 @@ module LavinMQ
type: "gauge",
labels: labels,
help: "Sum of ready and unacknowledged messages - total queue depth"})
+ writer.write({name: "detailed_queue_deduplication",
+ value: q.dedup_count,
+ type: "count",
+ labels: labels,
+ help: "Number of deduplicated messages for this queue"})
end
end
end
@@ -364,6 +373,19 @@ module LavinMQ
end
end
+ private def detailed_exchange_metrics(vhosts, writer)
+ vhosts.each do |vhost|
+ vhost.exchanges.each_value do |e|
+ labels = {exchange: e.name, vhost: vhost.name}
+ writer.write({name: "detailed_queue_deduplication",
+ value: e.dedup_count,
+ type: "count",
+ labels: labels,
+ help: "Number of deduplicated messages for this queue"})
+ end
+ end
+ end
+
private def detailed_connection_coarse_metrics(vhosts, writer)
vhosts.each do |vhost|
vhost.connections.each do |conn|
diff --git a/views/exchanges.ecr b/views/exchanges.ecr
index 844d73a095..580df4cc36 100644
--- a/views/exchanges.ecr
+++ b/views/exchanges.ecr
@@ -85,6 +85,18 @@
Alternate Exchange
If messages to this exchange cannot otherwise be routed, send them to the alternate exchange named here.
+ Message deduplication
+ Enable deduplication for this exchange
+ |
+ Deduplication cache size
+ Deduplication cache size, in number of entries
+ |
+ Deduplication cache ttl
+ How long an entry lives in the deduplication cache, in milliseconds
+ |
+ Deduplication header
+ Which header to check for deduplication, defaults to x-deduplication-header
+
diff --git a/views/queues.ecr b/views/queues.ecr
index 74a5e8f535..bfe1c27ac2 100644
--- a/views/queues.ecr
+++ b/views/queues.ecr
@@ -115,6 +115,18 @@
Valid units are Y(ear), M(onth), D(ays), h(ours), m(inutes), s(seconds).
Segments are only deleted when new messages are published to the stream queue.
+ Message deduplication
+ Enable deduplication for this exchange
+ |
+ Deduplication cache size
+ Deduplication cache size, in number of entries
+ |
+ Deduplication cache ttl
+ How long an entry lives in the deduplication cache, in milliseconds
+ |
+ Deduplication header
+ Which header to check for deduplication, defaults to x-deduplication-header
+