Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffering messages locally if broker is unavailable? #236

Open
emiltin opened this issue Dec 15, 2023 · 10 comments
Open

Buffering messages locally if broker is unavailable? #236

emiltin opened this issue Dec 15, 2023 · 10 comments

Comments

@emiltin
Copy link

emiltin commented Dec 15, 2023

Are outgoing messages buffered locally if the broker is unaviable?

If yes, how many messages can be stored, and are they stored only in RAM, or is there a way to persist them on disk?

@qzhuyan
Copy link
Contributor

qzhuyan commented Jan 10, 2024

In general, I suggest the caller of emqtt buffer the messages to simplify the error handlings in emqtt.

with --reconnect, the emqtt will just stay in reconnect state and all other message publishing calls/casts are buffered.

if you do sync publish QoS 1 message, the caller get blocked if peer node is down.
if you do async publish, they are buffered in RAM.

For references you could check the mqtt bridge impl. in EMQX.

@emiltin
Copy link
Author

emiltin commented Jan 10, 2024

I assume --reconnect refers to the CLI tool? I'm using emqtt as a library. Does it work the same when using the reconnect option?

If I understand you correctly, emqtt will buffere messages in RAM if you do an async publish while it's trying to reconnect?

Can you say more about why you recommend buffering messages outside emqtt?

@qzhuyan
Copy link
Contributor

qzhuyan commented Jan 11, 2024

I assume --reconnect refers to the CLI tool? I'm using emqtt as a library. Does it work the same when using the reconnect option?

yes. reconnect option should work when you use mqtt as lib.

If I understand you correctly, emqtt will buffere messages in RAM if you do an async publish while it's trying to reconnect?

yes. all publishs are postponed until connected so they are kind buffered in emqtt process

Can you say more about why you recommend buffering messages outside emqtt?

There are different preferences what to do when transport is in trouble or broker is unreachable.
(below, caller means the caller of emqtt)

a. How many messages should be buffered? what is the limit?
b. Should it only buffer QoS > 0 messages?
c. When there is a queue built up, should the emqtt client drop QoS0 messages to avoid space grows?
d. when the transport is recovered, there is a big queue or there is congestion, do you really want to deliver all the buffered messages? should it drop 'expired' messages and how?
e. should the caller get notice when peer is unreachable?
f. Should caller prefer connect to different peer instead of retrying on the same peer (In this case with reconnect option)?
....

handle above features are out of scope of a mqtt protocol client and emqtt runs isolated in a process, it provides callbacks when messages are delivered/acked by peer and the caller is free to implement the feature above.

@emiltin
Copy link
Author

emiltin commented Jan 11, 2024

Thank you for the reply, very helpful.
If I implement buffering myself, I will need to know when a message has been delivered. You mention callbacks, but I don't see them mentioned anywhere in the readme. Is there any additional documenation or guides that cover how to use callbacks?

@qzhuyan
Copy link
Contributor

qzhuyan commented Jan 11, 2024

take a look here:

publish_async(Client, Via, Msg, infinity, {fun ?MODULE:sync_publish_result/3, [self(), Mref]}),

@emiltin
Copy link
Author

emiltin commented Jan 17, 2024

Thank you for the link. I'm using emqtt from an Elixir app, so reading Erlang is bit challenging for me.
Is there any documentation about callbacks, apart from the code? Or maybe you can point to Elixir examples of using callbacks?

@emiltin
Copy link
Author

emiltin commented Jan 17, 2024

if you do async publish, they are buffered in RAM

Does it store only the last message send to each topic?

@qzhuyan
Copy link
Contributor

qzhuyan commented Jan 26, 2024

Thank you for the link. I'm using emqtt from an Elixir app, so reading Erlang is bit challenging for me.
Is there any documentation about callbacks, apart from the code? Or maybe you can point to Elixir examples of using callbacks?

emqtt/src/emqtt.erl
Line 527 in c815a18
publish_async(Client, Via, Msg, infinity, {fun ?MODULE:sync_publish_result/3, [self(), Mref]}),

that is you called the publish_async function with last arg defines a two element tuple.
the 1st element defines a callback function fun ?MODULE:sync_publish_result/3 which is defined in the same module and function name is sync_publish_result which takes three args.

the 2nd element is a list of args that will be used to call the callback function, the emqtt will append the 3rd args with Result.

Result is a map with following keys:

#{packet_id => PacketId,
    reason_code => ReasonCode,
    reason_code_name => reason_code_name(ReasonCode),
    properties => Properties,
    via => Via}

@qzhuyan
Copy link
Contributor

qzhuyan commented Jan 26, 2024

Does it store only the last message send to each topic?

depends on the connection states and QoS, message maybe dropped due to expired if msg expired is set.

@emiltin
Copy link
Author

emiltin commented Jan 30, 2024

Thank you for the explanation.

depends on the connection states and QoS, message maybe dropped due to expired if msg expired is set.

Ok, but otherwise multiple outgoing messages are stored for ewach topic, not just the last?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants