Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated MQTT topic exchange with MQTT 5.0 subscribers #13040

Open
ansd opened this issue Jan 8, 2025 · 7 comments
Open

Federated MQTT topic exchange with MQTT 5.0 subscribers #13040

ansd opened this issue Jan 8, 2025 · 7 comments

Comments

@ansd
Copy link
Member

ansd commented Jan 8, 2025

Describe the bug

Federation crashes if the MQTT topic exchange (amq.topic by default) is federated and MQTT 5.0 clients subscribed. That's because bindings are sent from downstream to upstream as described in https://www.rabbitmq.com/docs/federated-exchanges#details and binding arguments containing Erlang record mqtt_subscription_opts cannot be encoded in AMQP 0.9.1.

Reproduction steps

See #13033 (comment)

Expected behavior

Federation should work.

Additional context

Exporting definitions in JSON format also omits the mqtt_subscription_opts record from the binding arguments.

One solution is to encode the MQTT subscription options as an AMQP 0.9.1 table in the binding arguments when storing the binding in the database.

@sergio-aguilar-tuhh
Copy link

Hi @ansd, Could you share more details on how to implement that?

Any guidance or partial patch would be helpful.

Thanks again for your time.

@michaelklishin
Copy link
Member

@sergio-aguilar-tuhh as explained in Discussions, this is not a trivial change and we do not know how exact such non-standard bindings should be propagated.

You are welcome to investigate what the options are. Like I said, the binding propagation approach federation uses has been stable for 15 years. We certainly won't rush any possible solution.

Converting an MQTT subscription details in an AMQP 0-9-1 table is trivial. The question is primarily how to transfer and apply them on the other side of the link without breaking backwards compatibility. Federation is fairly commonly used between clusters that run different versions.

rabbit_federation_exchange_link is the relevant module in the federation plugin.

@sergio-aguilar-tuhh
Copy link

sergio-aguilar-tuhh commented Jan 9, 2025

Could you let me know how I could help to fix this issue ?

I’m eager to contribute in any way possible to help resolve this problem.

@michaelklishin
Copy link
Member

@sergio-aguilar-tuhh take a look at the functions linked to above and try to come up with a design that would allow such non-AMQP 0-9-1 bindings to be propagated without a lot of special casing and complexity, and with backwards compatibility with older versions when such bindings are not present.

@ansd
Copy link
Member Author

ansd commented Jan 10, 2025

Hi @sergio-aguilar-tuhh

One solution is to encode the MQTT subscription options as an AMQP 0.9.1 table in the binding arguments when storing the binding in the database.

This encoding/mapping should happen here. As @michaelklishin wrote, mixed version clusters need to work, i.e. RabbitMQ nodes containing the new code clustered with RabbitMQ nodes containing the old code (during a rolling update to the new version). Also, subscriptions (with subscription options) created before the rolling update need to be present (with the same subscription option semantics) after the rolling update.

@sergio-aguilar-tuhh
Copy link

Hi @michaelklishin and @ansd,

After applying the proposed changes to the binding_args_for_proto_ver/3 function, MQTT v5 communication now works between different modules:

Original Implementation:

binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts) ->
    BindingKey = mqtt_to_amqp(TopicFilter),
    [SubOpts, {<<"x-binding-key">>, longstr, BindingKey}].

Modified Implementation:

binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts = #mqtt_subscription_opts{
                                                        qos = QoS,
                                                        no_local = NoLocal,
                                                        retain_as_published = Rap,
                                                        retain_handling = Rh,
                                                        id = Id
                                                      }) ->
%% Convert the topic filter to an AMQP routing key:
BindingKey = mqtt_to_amqp(TopicFilter),

%% Build an AMQP table for MQTT subscription options:
%% - 'signedint' vs. 'long' vs. 'short' depends on your desired type
%% - Filter out or skip fields if they are `undefined`

BaseSubOptsTable = [
      {<<"qos">>,              signedint, QoS},
      {<<"no_local">>,         bool,      NoLocal},
      {<<"retain_as_published">>, bool,   Rap},
      {<<"retain_handling">>,  signedint, Rh}
    ],

%% Add subscription_id 
SubOptsTable =
  case Id of
    undefined ->
      BaseSubOptsTable;
    _ ->
      BaseSubOptsTable ++ [{<<"subscription_id">>, signedint, Id}]
  end,

%% Combine with the binding key. Federation sees a table instead of a raw record.
[
  {<<"mqtt_subscription_opts">>, table, SubOptsTable},
  {<<"x-binding-key">>,          longstr, BindingKey}
].

@michaelklishin
Copy link
Member

@sergio-aguilar-tuhh yes but does the opposite end create a correct binding? To avoid an exception you can use term_to_binary/1 to convert any Erlang data structure into a binary, which can be sent as a single field table.

The question is: what will the opposite end know what to do with it? Older versions won't, so another question is: is this acceptable? what would be the consequences?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants