From 0f18e4a1ad4adc82199f341a2fb24d9f777c290f Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 09:51:51 +0100 Subject: [PATCH 01/12] specs --- spec/deduplication_spec.cr | 150 +++++++++++++++++++++++++++++++++++++ spec/exchange_spec.cr | 60 +++++++++++++++ spec/queue_spec.cr | 21 ++++++ 3 files changed, 231 insertions(+) create mode 100644 spec/deduplication_spec.cr diff --git a/spec/deduplication_spec.cr b/spec/deduplication_spec.cr new file mode 100644 index 0000000000..0c4769de69 --- /dev/null +++ b/spec/deduplication_spec.cr @@ -0,0 +1,150 @@ +require "./spec_helper" +require "../src/lavinmq/deduplication.cr" + +describe LavinMQ::Deduplication do + describe LavinMQ::Deduplication::MemoryCache do + it "should have a max size" do + cache = LavinMQ::Deduplication::MemoryCache(String).new(2) + cache.insert("item1") + cache.insert("item2") + cache.insert("item3") + cache.contains?("item1").should be_false + cache.contains?("item2").should be_true + cache.contains?("item3").should be_true + end + + it "should store item without ttl" do + cache = LavinMQ::Deduplication::MemoryCache(String).new(10) + cache.insert("item1") + cache.contains?("item1").should be_true + cache.contains?("item2").should be_false + end + + it "should respect ttl" do + cache = LavinMQ::Deduplication::MemoryCache(String).new(3) + cache.insert("item1", 1) + cache.insert("item2", 300) + cache.insert("item3") + sleep 0.2.seconds + cache.contains?("item1").should be_false + cache.contains?("item2").should be_true + cache.contains?("item3").should be_true + end + end +end + +class MockCache < LavinMQ::Deduplication::Cache(AMQ::Protocol::Field) + @counter = Hash(String, Array({String, UInt32?})).new do |h, k| + h[k] = Array({String, UInt32?}).new + end + + def contains?(key) : Bool + @counter["contains?"] << {key.as(String), nil} + false + end + + def insert(key, ttl = nil) + @counter["insert"] << {key.as(String), ttl} + end + + def calls(key : String) + @counter[key] + end +end + +describe LavinMQ::Deduplication::Deduper do + describe "duplicate?" do + it "should return false if \"x-deduplication-header\" is missing (no identifier, always unique)" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock) + props = LavinMQ::AMQP::Properties.new + msg = LavinMQ::Message.new("ex", "rk", "body", props) + res = deduper.duplicate?(msg) + res.should be_false + end + + it "should check cache if entry exists" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.duplicate?(msg) + mock.calls("contains?").size.should eq 1 + end + + it "should only insert into cache if header has a value" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.add(msg) + mock.calls("insert").size.should eq 0 + end + + it "should only insert into cache if header has a value" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.add(msg) + mock.calls("insert").size.should eq 1 + end + + it "should respect x-cache-ttl on message" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + "x-cache-ttl" => 10, + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.add(msg) + calls = mock.calls("insert") + calls.first[0].should eq "msg1" + calls.first[1].should eq 10 + end + + it "should fallback to default ttl" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock, 12) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.add(msg) + calls = mock.calls("insert") + calls.first[0].should eq "msg1" + calls.first[1].should eq 12 + end + + it "should prio message ttl over default ttl" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock, 12) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + "x-cache-ttl" => 10, + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.add(msg) + calls = mock.calls("insert") + calls.first[0].should eq "msg1" + calls.first[1].should eq 10 + end + it "should allow checking any header for dedups" do + mock = MockCache.new + deduper = LavinMQ::Deduplication::Deduper.new(mock, 10, "custom") + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "custom" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + deduper.add(msg) + calls = mock.calls("insert") + calls.first[0].should eq "msg1" + calls.first[1].should eq 10 + end + end +end diff --git a/spec/exchange_spec.cr b/spec/exchange_spec.cr index 78cce29459..9772662040 100644 --- a/spec/exchange_spec.cr +++ b/spec/exchange_spec.cr @@ -170,4 +170,64 @@ describe LavinMQ::Exchange do end end end + describe "message deduplication" do + it "should handle message deduplication" do + with_amqp_server do |s| + with_channel(s) do |ch| + args = AMQP::Client::Arguments.new({ + "x-message-deduplication" => true, + "x-cache-size" => 10, + }) + ch.exchange("test", "topic", args: args) + ch.queue.bind("test", "#") + ex = s.vhosts["/"].exchanges["test"] + q = s.vhosts["/"].queues.first_value + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + ex.publish(msg, false).should eq 1 + ex.dedup_count.should eq 0 + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + ex.publish(msg, false).should eq 0 + ex.dedup_count.should eq 1 + + q.message_count.should eq 1 + end + end + end + + it "should handle message deduplication, on custom header" do + with_amqp_server do |s| + with_channel(s) do |ch| + args = AMQP::Client::Arguments.new({ + "x-message-deduplication" => true, + "x-cache-size" => 10, + "x-deduplication-header" => "custom", + }) + ch.exchange("test", "topic", args: args) + ch.queue.bind("test", "#") + ex = s.vhosts["/"].exchanges["test"] + q = s.vhosts["/"].queues.first_value + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "custom" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + ex.publish(msg, false).should eq 1 + ex.dedup_count.should eq 0 + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "custom" => "msg1", + })) + msg = LavinMQ::Message.new("ex", "rk", "body", props) + ex.publish(msg, false).should eq 0 + ex.dedup_count.should eq 1 + + q.message_count.should eq 1 + end + end + end + end end diff --git a/spec/queue_spec.cr b/spec/queue_spec.cr index 4a8860f60c..96fdea40d0 100644 --- a/spec/queue_spec.cr +++ b/spec/queue_spec.cr @@ -424,4 +424,25 @@ describe LavinMQ::AMQP::Queue do FileUtils.rm_rf tmpdir if tmpdir end end + + describe "deduplication" do + it "should not except message if it's a duplicate" do + with_amqp_server do |s| + with_channel(s) do |ch| + queue_name = "dedup-queue" + q1 = ch.queue(queue_name, args: AMQP::Client::Arguments.new({ + "x-message-deduplication" => true, + "x-cache-size" => 10, + })) + props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({ + "x-deduplication-header" => "msg1", + })) + ch.default_exchange.publish_confirm("body", queue_name, props: props) + ch.default_exchange.publish_confirm("body", queue_name, props: props) + q1.get(no_ack: false).not_nil!.body_io.to_s.should eq "body" + q1.get(no_ack: false).should be_nil + end + end + end + end end From f1f0dca2a7d29e164384bdf1e3ce7fc2ebe1813e Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 09:24:27 +0100 Subject: [PATCH 02/12] dedup impl --- src/lavinmq/deduplication.cr | 66 ++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/lavinmq/deduplication.cr diff --git a/src/lavinmq/deduplication.cr b/src/lavinmq/deduplication.cr new file mode 100644 index 0000000000..58869e41d2 --- /dev/null +++ b/src/lavinmq/deduplication.cr @@ -0,0 +1,66 @@ +module LavinMQ + module Deduplication + abstract class Cache(T) + abstract def contains?(key : T) : Bool + abstract def insert(key : T, ttl : UInt32?) + end + + class MemoryCache(T) < Cache(T) + def initialize(@size : UInt32) + @store = Hash(T, Int64?).new + end + + def contains?(key : T) : Bool + return false unless @store.has_key?(key) + ttd = @store[key] + return true unless ttd + return true if ttd > RoughTime.unix_ms + @store.delete(key) + false + end + + def insert(key : T, ttl : UInt32? = nil) + @store.shift if @store.size >= @size + @store[key] = ttl ? RoughTime.unix_ms + ttl : nil + end + end + + class Deduper + DEFAULT_HEADER_KEY = "x-deduplication-header" + + def initialize(@cache : Cache(AMQ::Protocol::Field), @default_ttl : UInt32? = nil, + @header_key : String? = nil) + end + + def add(msg : Message) + key = dedup_key(msg) + return unless key + @cache.insert(key, dedup_ttl(msg)) + end + + def duplicate?(msg : Message) : Bool + key = dedup_key(msg) + return false unless key + @cache.contains?(key) + end + + private def dedup_key(msg) + headers = msg.properties.headers + return unless headers + key = @header_key || DEFAULT_HEADER_KEY + headers[key]? + end + + private def dedup_ttl(msg) : UInt32? + headers = msg.properties.headers + def_ttl = @default_ttl + return def_ttl unless headers + value = headers["x-cache-ttl"]? + return def_ttl unless value + value = value.try(&.as?(Int32)) + return def_ttl unless value + value.to_u32 || def_ttl + end + end + end +end From 0f0406ada53801dccc18932dd1e1926c081ab1fd Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 09:26:08 +0100 Subject: [PATCH 03/12] queue impl --- src/lavinmq/amqp/queue/queue.cr | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index 67e2382d2a..ccb708df8f 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -112,7 +112,7 @@ module LavinMQ::AMQP # Creates @[x]_count and @[x]_rate and @[y]_log rate_stats( {"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"}, - {"message_count", "unacked_count"}) + {"message_count", "unacked_count", "dedup"}) getter name, arguments, vhost, consumers, last_get_time getter? auto_delete, exclusive @@ -127,6 +127,7 @@ module LavinMQ::AMQP @data_dir : String Log = LavinMQ::Log.for "queue" @metadata : ::Log::Metadata + @deduper : Deduplication::Deduper? def initialize(@vhost : VHost, @name : String, @exclusive = false, @auto_delete = false, @@ -271,6 +272,14 @@ module LavinMQ::AMQP @single_active_consumer_queue = parse_header("x-single-active-consumer", Bool) == true @consumer_timeout = parse_header("x-consumer-timeout", Int).try &.to_u64 validate_positive("x-consumer-timeout", @consumer_timeout) + if parse_header("x-message-deduplication", Bool) + size = parse_header("x-cache-size", Int).try(&.to_u32) + raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size + ttl = parse_header("x-cache-ttl", Int).try(&.to_u32) + header_key = parse_header("x-deduplication-header", String) + cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size) + @deduper = Deduplication::Deduper.new(cache, ttl, header_key) + end end private macro parse_header(header, type) @@ -421,6 +430,13 @@ module LavinMQ::AMQP def publish(msg : Message) : Bool return false if @deleted || @state.closed? + if d = @deduper + if d.duplicate?(msg) + @dedup_count += 1 + return false + end + d.add(msg) + end reject_on_overflow(msg) @msg_store_lock.synchronize do @msg_store.push(msg) From ccd2c4a24fd2a666718104740a3158df0675e58c Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 09:38:21 +0100 Subject: [PATCH 04/12] exchange impl --- src/lavinmq/amqp/exchange/exchange.cr | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr index f8e8685496..6cbe59f388 100644 --- a/src/lavinmq/amqp/exchange/exchange.cr +++ b/src/lavinmq/amqp/exchange/exchange.cr @@ -28,9 +28,10 @@ module LavinMQ @alternate_exchange : String? @delayed_queue : Queue? @deleted = false + @deduper : Deduplication::Deduper? - rate_stats({"publish_in", "publish_out", "unroutable"}) - property publish_in_count, publish_out_count, unroutable_count + rate_stats({"publish_in", "publish_out", "unroutable", "dedup"}) + property publish_in_count, publish_out_count, unroutable_count, dedup_count def initialize(@vhost : VHost, @name : String, @durable = false, @auto_delete = false, @internal = false, @@ -70,6 +71,20 @@ module LavinMQ @delayed = true init_delayed_queue end + if @arguments["x-message-deduplication"]?.try(&.as?(Bool)) + ttl = parse_header("x-cache-ttl", Int).try(&.to_u32) + size = parse_header("x-cache-size", Int).try(&.to_u32) + raise LavinMQ::Error::PreconditionFailed.new("Invalid x-cache-size for message deduplication") unless size + header_key = parse_header("x-deduplication-header", String) + cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size) + @deduper = Deduplication::Deduper.new(cache, ttl, header_key) + end + end + + private macro parse_header(header, type) + if value = @arguments["{{ header.id }}"]? + value.as?({{ type }}) || raise LavinMQ::Error::PreconditionFailed.new("{{ header.id }} header not a {{ type.id }}") + end end def details_tuple @@ -146,6 +161,13 @@ module LavinMQ queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Int32 @publish_in_count += 1 + if d = @deduper + if d.duplicate?(msg) + @dedup_count += 1 + return 0 + end + d.add(msg) + end count = do_publish(msg, immediate, queues, exchanges) @unroutable_count += 1 if count.zero? @publish_out_count += count From ee2dddd5550cd5ba33f24884a59ec570f07d0a3b Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 09:41:15 +0100 Subject: [PATCH 05/12] metrics --- src/lavinmq/http/controller/prometheus.cr | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/lavinmq/http/controller/prometheus.cr b/src/lavinmq/http/controller/prometheus.cr index fb4a85a4cd..52a161a395 100644 --- a/src/lavinmq/http/controller/prometheus.cr +++ b/src/lavinmq/http/controller/prometheus.cr @@ -11,7 +11,8 @@ module LavinMQ NamedTuple(name: String) | NamedTuple(channel: String) | NamedTuple(id: String) | - NamedTuple(queue: String, vhost: String) + NamedTuple(queue: String, vhost: String) | + NamedTuple(exchange: String, vhost: String) alias Metric = NamedTuple(name: String, value: MetricValue) | NamedTuple(name: String, value: MetricValue, labels: MetricLabels) | NamedTuple(name: String, value: MetricValue, help: String) | @@ -97,6 +98,8 @@ module LavinMQ detailed_connection_coarse_metrics(vhosts, writer) when "channel_metrics" detailed_channel_metrics(vhosts, writer) + when "exchange_metrics" + detailed_exchange_metrics(vhosts, writer) end end end @@ -347,6 +350,11 @@ module LavinMQ type: "gauge", labels: labels, help: "Sum of ready and unacknowledged messages - total queue depth"}) + writer.write({name: "detailed_queue_deduplication", + value: q.dedup_count, + type: "count", + labels: labels, + help: "Number of deduplicated messages for this queue"}) end end end @@ -364,6 +372,19 @@ module LavinMQ end end + private def detailed_exchange_metrics(vhosts, writer) + vhosts.each do |vhost| + vhost.exchanges.each_value do |e| + labels = {exchange: e.name, vhost: vhost.name} + writer.write({name: "detailed_queue_deduplication", + value: e.dedup_count, + type: "count", + labels: labels, + help: "Number of deduplicated messages for this queue"}) + end + end + end + private def detailed_connection_coarse_metrics(vhosts, writer) vhosts.each do |vhost| vhost.connections.each do |conn| From 484d110593491e02db18bef53c16ec0242c22ef7 Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 09:42:48 +0100 Subject: [PATCH 06/12] ui --- views/exchanges.ecr | 12 ++++++++++++ views/queues.ecr | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/views/exchanges.ecr b/views/exchanges.ecr index 844d73a095..580df4cc36 100644 --- a/views/exchanges.ecr +++ b/views/exchanges.ecr @@ -85,6 +85,18 @@ Alternate Exchange If messages to this exchange cannot otherwise be routed, send them to the alternate exchange named here. + Message deduplication + Enable deduplication for this exchange + | + Deduplication cache size + Deduplication cache size, in number of entries + | + Deduplication cache ttl + How long an entry lives in the deduplication cache, in milliseconds + | + Deduplication header + Which header to check for deduplication, defaults to x-deduplication-header + diff --git a/views/queues.ecr b/views/queues.ecr index 74a5e8f535..bfe1c27ac2 100644 --- a/views/queues.ecr +++ b/views/queues.ecr @@ -115,6 +115,18 @@ Valid units are Y(ear), M(onth), D(ays), h(ours), m(inutes), s(seconds). Segments are only deleted when new messages are published to the stream queue. + Message deduplication + Enable deduplication for this exchange + | + Deduplication cache size + Deduplication cache size, in number of entries + | + Deduplication cache ttl + How long an entry lives in the deduplication cache, in milliseconds + | + Deduplication header + Which header to check for deduplication, defaults to x-deduplication-header + From 80080f4c7770a5fc87fb305542bb55a158a45e1a Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 11:04:04 +0100 Subject: [PATCH 07/12] queue fixes --- src/lavinmq/amqp/queue/queue.cr | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index ccb708df8f..2f826d0e2e 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -13,6 +13,7 @@ require "./state" require "./event" require "./message_store" require "../../unacked_message" +require "../../deduplication" module LavinMQ::AMQP class Queue < LavinMQ::Queue @@ -111,8 +112,8 @@ module LavinMQ::AMQP # Creates @[x]_count and @[x]_rate and @[y]_log rate_stats( - {"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"}, - {"message_count", "unacked_count", "dedup"}) + {"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"}, + {"message_count", "unacked_count"}) getter name, arguments, vhost, consumers, last_get_time getter? auto_delete, exclusive From cb49b4fb27fc6c3f41d008180b5d711314a0718e Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 11:13:40 +0100 Subject: [PATCH 08/12] ameba --- src/lavinmq/http/controller/prometheus.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lavinmq/http/controller/prometheus.cr b/src/lavinmq/http/controller/prometheus.cr index 52a161a395..e8c1276595 100644 --- a/src/lavinmq/http/controller/prometheus.cr +++ b/src/lavinmq/http/controller/prometheus.cr @@ -63,6 +63,7 @@ module LavinMQ vhosts.to_a end + # ameba:disable Metrics/CyclomaticComplexity private def register_routes get "/metrics" do |context, _| context.response.content_type = "text/plain" From 8c1aa11c13d3eaa8cd34be4844f1906cae4a0bd2 Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 11:36:46 +0100 Subject: [PATCH 09/12] convert ms to ns correctly --- src/lavinmq/deduplication.cr | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/lavinmq/deduplication.cr b/src/lavinmq/deduplication.cr index 58869e41d2..d969240ba4 100644 --- a/src/lavinmq/deduplication.cr +++ b/src/lavinmq/deduplication.cr @@ -7,21 +7,26 @@ module LavinMQ class MemoryCache(T) < Cache(T) def initialize(@size : UInt32) - @store = Hash(T, Int64?).new + @store = Hash(T, Time::Span?).new end def contains?(key : T) : Bool return false unless @store.has_key?(key) ttd = @store[key] return true unless ttd - return true if ttd > RoughTime.unix_ms + return true if ttd > RoughTime.monotonic @store.delete(key) false end def insert(key : T, ttl : UInt32? = nil) @store.shift if @store.size >= @size - @store[key] = ttl ? RoughTime.unix_ms + ttl : nil + val = if ttl + RoughTime.monotonic + Time::Span.new(nanoseconds: ttl * 1_000_000) + else + nil + end + @store[key] = val end end From 15de660a02a6c0a9800be8ed578f5f73341f1127 Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 11:36:46 +0100 Subject: [PATCH 10/12] Nicer syntax for working with Time::Span --- src/lavinmq/deduplication.cr | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/lavinmq/deduplication.cr b/src/lavinmq/deduplication.cr index d969240ba4..e9f79e36eb 100644 --- a/src/lavinmq/deduplication.cr +++ b/src/lavinmq/deduplication.cr @@ -7,7 +7,7 @@ module LavinMQ class MemoryCache(T) < Cache(T) def initialize(@size : UInt32) - @store = Hash(T, Time::Span?).new + @store = Hash(T, Time::Span?).new(initial_capacity: @size) end def contains?(key : T) : Bool @@ -21,11 +21,7 @@ module LavinMQ def insert(key : T, ttl : UInt32? = nil) @store.shift if @store.size >= @size - val = if ttl - RoughTime.monotonic + Time::Span.new(nanoseconds: ttl * 1_000_000) - else - nil - end + val = ttl.try { |v| RoughTime.monotonic + v.milliseconds } @store[key] = val end end From 6c8400598f7fe1e4835562ff46d514ff09b2c362 Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 11:36:46 +0100 Subject: [PATCH 11/12] docs --- CHANGELOG.md | 2 ++ README.md | 1 + 2 files changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83fa39a2bb..f40835bfa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Support for message deduplication on exchanges and queues [854](https://github.com/cloudamqp/lavinmq/pull/854) + ## [2.1.0] - 2025-01-16 ### Changed diff --git a/README.md b/README.md index 1a12920a17..f0f5269f68 100644 --- a/README.md +++ b/README.md @@ -226,6 +226,7 @@ For questions or suggestions: - Replication - Stream queues - Automatic leader election in clusters via etcd +- Message deduplication ### Known differences to other AMQP servers From 439306f2b99890fa78027b9d07720d9ff857cfac Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Fri, 17 Jan 2025 11:36:46 +0100 Subject: [PATCH 12/12] mutex in cache --- src/lavinmq/deduplication.cr | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/lavinmq/deduplication.cr b/src/lavinmq/deduplication.cr index e9f79e36eb..fd3cd0d49f 100644 --- a/src/lavinmq/deduplication.cr +++ b/src/lavinmq/deduplication.cr @@ -7,22 +7,27 @@ module LavinMQ class MemoryCache(T) < Cache(T) def initialize(@size : UInt32) + @lock = Mutex.new @store = Hash(T, Time::Span?).new(initial_capacity: @size) end def contains?(key : T) : Bool - return false unless @store.has_key?(key) - ttd = @store[key] - return true unless ttd - return true if ttd > RoughTime.monotonic - @store.delete(key) - false + @lock.synchronize do + return false unless @store.has_key?(key) + ttd = @store[key] + return true unless ttd + return true if ttd > RoughTime.monotonic + @store.delete(key) + false + end end def insert(key : T, ttl : UInt32? = nil) - @store.shift if @store.size >= @size - val = ttl.try { |v| RoughTime.monotonic + v.milliseconds } - @store[key] = val + @lock.synchronize do + @store.shift if @store.size >= @size + val = ttl.try { |v| RoughTime.monotonic + v.milliseconds } + @store[key] = val + end end end