Skip to content

Commit

Permalink
Emit events on stream consume and cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Jan 20, 2025
1 parent a8c8cf2 commit 31a4d61
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 58 deletions.
12 changes: 11 additions & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ consume(Q, Spec, #stream_client{} = QState0)
consumer_tag := ConsumerTag,
exclusive_consume := ExclusiveConsume,
args := Args,
ok_msg := OkMsg} = Spec,
ok_msg := OkMsg,
acting_user := ActingUser} = Spec,
QName = amqqueue:get_name(Q),
rabbit_log:debug("~s:~s Local pid resolved ~0p",
[?MODULE, ?FUNCTION_NAME, LocalPid]),
Expand All @@ -330,6 +331,15 @@ consume(Q, Spec, #stream_client{} = QState0)
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
QName, ConsumerPrefetchCount, true, up, Args),
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, ExclusiveConsume},
{ack_required, AckRequired},
{channel, ChPid},
{queue, QName},
{prefetch_count, ConsumerPrefetchCount},
{arguments, Args},
{user_who_performed_action, ActingUser}]),
%% reply needs to be sent before the stream
%% begins sending
maybe_send_reply(ChPid, OkMsg),
Expand Down
59 changes: 59 additions & 0 deletions deps/rabbit/test/rabbit_list_test_event_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_list_test_event_handler).

-behaviour(gen_event).

-export([start_link/0, stop/0, get_events/0, clear_events/0]).

%% callbacks
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).

start_link() ->
gen_event:start_link({local, ?MODULE}).

stop() ->
gen_event:stop(?MODULE).

get_events() ->
gen_event:call(?MODULE, ?MODULE, get_events).

clear_events() ->
gen_event:call(?MODULE, ?MODULE, clear_events).

%% Callbacks

init([]) ->
{ok, []}.

handle_event(Event, State) ->
{ok, [Event | State]}.

handle_call(get_events, State) ->
{ok, lists:reverse(State), State};
handle_call(clear_events, _) ->
{ok, ok, []}.

handle_info(_Info, State) ->
{ok, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
58 changes: 57 additions & 1 deletion deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ all_tests_3() ->
consume_credit_multiple_ack,
basic_cancel,
consumer_metrics_cleaned_on_connection_close,
consume_cancel_should_create_events,
receive_basic_cancel_on_queue_deletion,
keep_consuming_on_leader_restart,
max_length_bytes,
Expand Down Expand Up @@ -1195,7 +1196,7 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
{ok, Ch} = amqp_connection:open_channel(Conn),
qos(Ch, 10, false),
CTag = <<"consumer_metrics_cleaned_on_connection_close">>,
CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
subscribe(Ch, Q, false, 0, CTag),
rabbit_ct_helpers:await_condition(
fun() ->
Expand All @@ -1211,6 +1212,49 @@ consumer_metrics_cleaned_on_connection_close(Config) ->

rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

consume_cancel_should_create_events(Config) ->
HandlerMod = rabbit_list_test_event_handler,
rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
add_handler,
[rabbit_event, HandlerMod, []]),
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),

Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
{ok, Ch} = amqp_connection:open_channel(Conn),
qos(Ch, 10, false),

ok = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
call,
[rabbit_event, HandlerMod, clear_events]),

CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME),

?assertEqual([], filtered_events(Config, consumer_created, CTag)),
?assertEqual([], filtered_events(Config, consumer_deleted, CTag)),

subscribe(Ch, Q, false, 0, CTag),

?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created, CTag), ?WAIT),
?assertEqual([], filtered_events(Config, consumer_deleted, CTag)),

amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),

?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted, CTag), ?WAIT),

rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
delete_handler,
[rabbit_event, HandlerMod, []]),

ok = rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

receive_basic_cancel_on_queue_deletion(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down Expand Up @@ -1395,6 +1439,18 @@ filter_consumers(Config, Server, CTag) ->
end
end, [], CInfo).


filtered_events(Config, EventType, CTag) ->
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
call,
[rabbit_event, rabbit_list_test_event_handler, get_events]),
lists:filter(fun({event, Type, Fields, _, _}) when Type =:= EventType ->
proplists:get_value(consumer_tag, Fields) =:= CTag;
(_) ->
false
end, Events).

consume_and_reject(Config) ->
consume_and_(Config, fun (DT) -> #'basic.reject'{delivery_tag = DT} end).
consume_and_nack(Config) ->
Expand Down
40 changes: 29 additions & 11 deletions deps/rabbitmq_stream/src/rabbit_stream_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

%% API
-export([init/0]).
-export([consumer_created/9,
-export([consumer_created/10,
consumer_updated/9,
consumer_cancelled/4]).
consumer_cancelled/5]).
-export([publisher_created/4,
publisher_updated/7,
publisher_deleted/3]).
Expand All @@ -42,7 +42,8 @@ consumer_created(Connection,
Offset,
OffsetLag,
Active,
Properties) ->
Properties,
ActingUser) ->
Values =
[{credits, Credits},
{consumed, MessageCount},
Expand All @@ -55,16 +56,32 @@ consumer_created(Connection,
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
rabbit_global_counters:consumer_created(stream),
rabbit_core_metrics:consumer_created(Connection,
consumer_tag(SubscriptionId),
false,
false,
CTag = consumer_tag(SubscriptionId),
ExclusiveConsume = false,
AckRequired = false,
Pid = Connection,
PrefetchCount = 0,
Args = rabbit_misc:to_amqp_table(Properties),
rabbit_core_metrics:consumer_created(Pid,
CTag,
ExclusiveConsume,
AckRequired,
StreamResource,
0,
PrefetchCount,
Active,
rabbit_stream_utils:consumer_activity_status(Active,
Properties),
rabbit_misc:to_amqp_table(Properties)),
Args),

rabbit_event:notify(consumer_created,
[{consumer_tag, CTag},
{exclusive, ExclusiveConsume},
{ack_required, AckRequired},
{channel, Pid},
{queue, StreamResource},
{prefetch_count, PrefetchCount},
{arguments, Args},
{user_who_performed_action, ActingUser}]),
ok.

consumer_tag(SubscriptionId) ->
Expand Down Expand Up @@ -104,7 +121,7 @@ consumer_updated(Connection,

ok.

consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) ->
ets:delete(?TABLE_CONSUMER,
{StreamResource, Connection, SubscriptionId}),
rabbit_global_counters:consumer_deleted(stream),
Expand All @@ -115,7 +132,8 @@ consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
true ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, consumer_tag(SubscriptionId)},
{channel, self()}, {queue, StreamResource}]);
{channel, self()}, {queue, StreamResource},
{user_who_performed_action, ActingUser}]);
_ -> ok
end,
ok.
Expand Down
69 changes: 30 additions & 39 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2924,9 +2924,8 @@ consumer_name(_Properties) ->
maybe_dispatch_on_subscription(Transport,
State,
ConsumerState,
#stream_connection{deliver_version =
DeliverVersion} =
Connection,
#stream_connection{deliver_version = DeliverVersion,
user = #user{username = Username}} = Connection,
Consumers,
Stream,
SubscriptionId,
Expand Down Expand Up @@ -2970,13 +2969,14 @@ maybe_dispatch_on_subscription(Transport,
ConsumerOffset,
ConsumerOffsetLag,
true,
SubscriptionProperties),
SubscriptionProperties,
Username),
State#stream_connection_state{consumers = Consumers1}
end;
maybe_dispatch_on_subscription(_Transport,
State,
ConsumerState,
Connection,
#stream_connection{user = #user{username = Username}} = Connection,
Consumers,
Stream,
SubscriptionId,
Expand All @@ -3000,7 +3000,8 @@ maybe_dispatch_on_subscription(_Transport,
Offset,
0, %% offset lag
Active,
SubscriptionProperties),
SubscriptionProperties,
Username),
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
State#stream_connection_state{consumers = Consumers1}.

Expand Down Expand Up @@ -3205,19 +3206,15 @@ partition_index(VirtualHost, Stream, Properties) ->
-1
end.

notify_connection_closed(#statem_data{connection =
#stream_connection{name = Name,
publishers =
Publishers} =
Connection,
connection_state =
#stream_connection_state{consumers =
Consumers} =
ConnectionState}) ->
notify_connection_closed(#statem_data{
connection = #stream_connection{name = Name,
user = #user{username = Username},
publishers = Publishers} = Connection,
connection_state = #stream_connection_state{consumers = Consumers} = ConnectionState}) ->
rabbit_core_metrics:connection_closed(self()),
[rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(S, Connection),
SubId, false)
SubId, Username, false)
|| #consumer{configuration =
#consumer_configuration{stream = S,
subscription_id = SubId}}
Expand Down Expand Up @@ -3275,24 +3272,15 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
end, {Connection, State}, Partitions).

clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
#stream_connection{virtual_host =
VirtualHost,
stream_subscriptions
=
StreamSubscriptions,
publishers =
Publishers,
publisher_to_ids
=
PublisherToIds,
stream_leaders =
Leaders,
outstanding_requests = Requests0} =
C0,
#stream_connection_state{consumers
=
Consumers} =
S0) ->
#stream_connection{
user = #user{username = Username},
virtual_host = VirtualHost,
stream_subscriptions = StreamSubscriptions,
publishers = Publishers,
publisher_to_ids = PublisherToIds,
stream_leaders = Leaders,
outstanding_requests = Requests0} = C0,
#stream_connection_state{consumers = Consumers} = S0) ->
{SubscriptionsCleaned, C1, S1} =
case stream_has_subscriptions(Stream, C0) of
true ->
Expand All @@ -3306,6 +3294,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
stream_r(Stream,
C0),
SubId,
Username,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
Expand All @@ -3317,6 +3306,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
stream_r(Stream,
C0),
SubId,
Username,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
Expand Down Expand Up @@ -3429,11 +3419,11 @@ lookup_leader_from_manager(VirtualHost, Stream) ->
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).

remove_subscription(SubscriptionId,
#stream_connection{virtual_host = VirtualHost,
outstanding_requests = Requests0,
stream_subscriptions =
StreamSubscriptions} =
Connection,
#stream_connection{
user = #user{username = Username},
virtual_host = VirtualHost,
outstanding_requests = Requests0,
stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State,
Notify) ->
#{SubscriptionId := Consumer} = Consumers,
Expand Down Expand Up @@ -3462,6 +3452,7 @@ remove_subscription(SubscriptionId,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream, Connection2),
SubscriptionId,
Username,
Notify),

Requests1 = maybe_unregister_consumer(
Expand Down
Loading

0 comments on commit 31a4d61

Please sign in to comment.