Skip to content

Commit

Permalink
Move cl.WriteLoop() to attachClient() and call cl.Stop() in loadClien…
Browse files Browse the repository at this point in the history
…ts() to update client.State. (#344)

* Moving go cl.WriteLoop() out of NewClient() and placing it in server.attachClient().

* Call cl.Stop() to cancel the context, update cl.State with information such as disconnected time, and set the stopCause.

* update README-CN.md

---------

Co-authored-by: JB <[email protected]>
  • Loading branch information
werbenhu and mochi-co authored Dec 22, 2023
1 parent 5523d15 commit b2ab984
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ server := mqtt.New(&mqtt.Options{
| 数据持久性 | [mochi-mqtt/server/hooks/storage/redis](hooks/storage/redis/redis.go) | 使用 [Redis](https://redis.io) 进行持久性存储。 |
| 调试跟踪 | [mochi-mqtt/server/hooks/debug](hooks/debug/debug.go) | 调试输出以查看数据包在服务端的链路追踪。 |

许多内部函数都已开放给开发者,你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问,你可以[提交问题](https://github.com/mochi-mqtt/server/issues)给我们。 |
许多内部函数都已开放给开发者,你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问,你可以[提交问题](https://github.com/mochi-mqtt/server/issues)给我们。

### 访问控制(Access Control)

Expand Down
7 changes: 5 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,6 @@ func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool)
// By default, we don't want to restrict developer publishes,
// but if you do, reset this after creating inline client.
cl.State.Inflight.ResetReceiveQuota(math.MaxInt32)
} else {
go cl.WriteLoop() // can only write to real clients
}

return cl
Expand Down Expand Up @@ -332,6 +330,8 @@ func (s *Server) EstablishConnection(listener string, c net.Conn) error {
func (s *Server) attachClient(cl *Client, listener string) error {
defer s.Listeners.ClientsWg.Done()
s.Listeners.ClientsWg.Add(1)

go cl.WriteLoop()
defer cl.Stop(nil)

pk, err := s.readConnectionPacket(cl)
Expand Down Expand Up @@ -1554,6 +1554,9 @@ func (s *Server) loadClients(v []storage.Client) {
}
cl.Properties.Will = Will(c.Will)

// cancel the context, update cl.State such as disconnected time and stopCause.
cl.Stop(packets.ErrServerShuttingDown)

expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, packets.ErrServerShuttingDown, expire)
if expire {
Expand Down

0 comments on commit b2ab984

Please sign in to comment.