-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Management UI: extend Get Message feature to streams #11030
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,6 +7,7 @@ | |||||
|
||||||
-module(rabbit_mgmt_wm_queue_get). | ||||||
|
||||||
-include_lib("kernel/include/logger.hrl"). | ||||||
-export([init/2, resource_exists/2, is_authorized/2, allow_missing_post/2, | ||||||
allowed_methods/2, accept_content/2, content_types_provided/2, | ||||||
content_types_accepted/2]). | ||||||
|
@@ -47,34 +48,124 @@ accept_content(ReqData, Context) -> | |||||
do_it(ReqData0, Context) -> | ||||||
VHost = rabbit_mgmt_util:vhost(ReqData0), | ||||||
Q = rabbit_mgmt_util:id(queue, ReqData0), | ||||||
rabbit_mgmt_util:with_decode( | ||||||
[ackmode, count, encoding], ReqData0, Context, | ||||||
fun([AckModeBin, CountBin, EncBin], Body, ReqData) -> | ||||||
rabbit_mgmt_util:with_channel( | ||||||
VHost, ReqData, Context, | ||||||
fun (Ch) -> | ||||||
AckMode = list_to_atom(binary_to_list(AckModeBin)), | ||||||
Count = rabbit_mgmt_util:parse_int(CountBin), | ||||||
Enc = case EncBin of | ||||||
<<"auto">> -> auto; | ||||||
<<"base64">> -> base64; | ||||||
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>}) | ||||||
end, | ||||||
Trunc = case maps:get(truncate, Body, undefined) of | ||||||
undefined -> none; | ||||||
TruncBin -> rabbit_mgmt_util:parse_int( | ||||||
TruncBin) | ||||||
end, | ||||||
|
||||||
Reply = basic_gets(Count, Ch, Q, AckMode, Enc, Trunc), | ||||||
maybe_return(Reply, Ch, AckMode), | ||||||
rabbit_mgmt_util:reply(remove_delivery_tag(Reply), | ||||||
ReqData, Context) | ||||||
end) | ||||||
end). | ||||||
Resource = rabbit_misc:r(<<"/">>, queue, Q), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
{ok, Queue} = rabbit_amqqueue:lookup(Resource), | ||||||
case amqqueue:get_type(Queue) of | ||||||
rabbit_stream_queue -> | ||||||
rabbit_mgmt_util:with_decode( | ||||||
[count, encoding, offset], ReqData0, Context, | ||||||
fun([CountBin, EncBin, OffsetBin], Body, ReqData) -> | ||||||
rabbit_mgmt_util:with_channel( | ||||||
VHost, ReqData, Context, | ||||||
fun (Ch) -> | ||||||
Count = rabbit_mgmt_util:parse_int(CountBin), | ||||||
Enc = case EncBin of | ||||||
<<"auto">> -> auto; | ||||||
<<"base64">> -> base64; | ||||||
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>}) | ||||||
end, | ||||||
Offset = rabbit_mgmt_util:parse_int(OffsetBin), | ||||||
Trunc = case maps:get(truncate, Body, undefined) of | ||||||
undefined -> none; | ||||||
TruncBin -> rabbit_mgmt_util:parse_int( | ||||||
TruncBin) | ||||||
end, | ||||||
CTag = <<"ctag">>, | ||||||
Reply = start_subscription_gets( | ||||||
Count, Ch, Q, CTag, Offset, Enc, Trunc), | ||||||
rabbit_mgmt_util:reply(remove_delivery_tag(Reply), | ||||||
ReqData, Context) | ||||||
end) | ||||||
end); | ||||||
_ -> | ||||||
rabbit_mgmt_util:with_decode( | ||||||
[ackmode, count, encoding], ReqData0, Context, | ||||||
fun([AckModeBin, CountBin, EncBin], Body, ReqData) -> | ||||||
rabbit_mgmt_util:with_channel( | ||||||
VHost, ReqData, Context, | ||||||
fun (Ch) -> | ||||||
AckMode = list_to_atom(binary_to_list(AckModeBin)), | ||||||
Count = rabbit_mgmt_util:parse_int(CountBin), | ||||||
Enc = case EncBin of | ||||||
<<"auto">> -> auto; | ||||||
<<"base64">> -> base64; | ||||||
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>}) | ||||||
end, | ||||||
Trunc = case maps:get(truncate, Body, undefined) of | ||||||
undefined -> none; | ||||||
TruncBin -> rabbit_mgmt_util:parse_int( | ||||||
TruncBin) | ||||||
end, | ||||||
Reply = basic_gets(Count, Ch, Q, AckMode, Enc, | ||||||
Trunc), | ||||||
maybe_return(Reply, Ch, AckMode), | ||||||
rabbit_mgmt_util:reply(remove_delivery_tag(Reply), | ||||||
ReqData, Context) | ||||||
end) | ||||||
end) | ||||||
end. | ||||||
|
||||||
start_subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) -> | ||||||
qos(Ch, Count), | ||||||
subscribe(Ch, Queue, false, Offset, CTag), | ||||||
Replies = subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc), | ||||||
cancel_subscription(Ch, CTag), | ||||||
Replies. | ||||||
|
||||||
subscription_gets(0, _Ch, _Queue, _CTag, _Offset, _Enc, _Trunc) -> | ||||||
[]; | ||||||
subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) -> | ||||||
case subscription_get(Ch, Enc, Trunc) of | ||||||
none -> []; | ||||||
Reply -> [Reply | subscription_gets(Count - 1, Ch, Queue, CTag, Offset, Enc, Trunc)] | ||||||
end. | ||||||
|
||||||
subscription_get(Ch, Enc, Trunc) -> | ||||||
receive | ||||||
{#'basic.deliver'{redelivered = Redelivered, | ||||||
exchange = Exchange, | ||||||
routing_key = RoutingKey, | ||||||
delivery_tag = DeliveryTag, | ||||||
consumer_tag = ConsumerTag}, | ||||||
#amqp_msg{props = Props, payload = Payload}} -> | ||||||
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont think we want to ack the message. If the stream has 20 messages, and we ask for 10 (prefetch = 10) when we ack the first message, the broker will send the 11th message. (Prefetch count means the max number of outstanding ie unacknowledged messages) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feature should not arguably use AMQP 0-9-1 at all. And the original feature should never have used an AMQP 0-9-1 Erlang client to begin with. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it would probably be better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont say that an amqp connection should be used to implement this feature, just want to mention the aspect, or implicit benefit that the server side of an amqp connection does additional authorization checks. Would like to cite the recent attempt for the delete queue endpoint to use an internal API #9550 which was reverted #10062 (comment). Maybe can you recall what was the difficulty, why the path outlined in #10062 was abandoned? Or maybe that restriction is not applicable for a simple reading from a queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like a way to not move anywhere.. why would not using the 0-9-1 protocol be ok expect it's maybe not 100% optimal? This solves a real world problem with minimal change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
multiple = false}), | ||||||
[{payload_bytes, size(Payload)}, | ||||||
{redelivered, Redelivered}, | ||||||
{exchange, Exchange}, | ||||||
{routing_key, RoutingKey}, | ||||||
{consumer_tag, ConsumerTag}, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consumer tag is just hardcoded "ctag" so it does not need to be included. |
||||||
{properties, rabbit_mgmt_format:basic_properties(Props)}] ++ | ||||||
payload_part(maybe_truncate(Payload, Trunc), Enc) | ||||||
after | ||||||
300 -> | ||||||
none | ||||||
end. | ||||||
|
||||||
subscribe(Ch, Queue, NoAck, Offset, CTag) -> | ||||||
amqp_channel:subscribe( | ||||||
Ch, | ||||||
#'basic.consume'{queue = Queue, | ||||||
no_ack = NoAck, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NoAck is always false so we can hardcode it I think |
||||||
consumer_tag = CTag, | ||||||
arguments = [{<<"x-stream-offset">>, long, Offset}]}, | ||||||
self()), | ||||||
receive | ||||||
#'basic.consume_ok'{consumer_tag = CTag} -> | ||||||
ok | ||||||
end. | ||||||
|
||||||
qos(Ch, Prefetch) -> | ||||||
#'basic.qos_ok'{} = amqp_channel:call( | ||||||
Ch, | ||||||
#'basic.qos'{global = false, prefetch_count = Prefetch}). | ||||||
|
||||||
cancel_subscription(Ch, CTag) -> | ||||||
amqp_channel:call( | ||||||
Ch, | ||||||
#'basic.cancel'{ | ||||||
consumer_tag = CTag, | ||||||
nowait = false}). | ||||||
|
||||||
basic_gets(0, _, _, _, _, _) -> | ||||||
[]; | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used anywhere? or just a leftover from debugging?