diff --git a/broadcast.go b/broadcast.go index 625125b..dd1494f 100644 --- a/broadcast.go +++ b/broadcast.go @@ -1,6 +1,9 @@ package link -import "github.com/funny/sync" +import ( + "github.com/funny/sync" + "time" +) // Broadcaster. type Broadcaster struct { @@ -24,7 +27,7 @@ func NewBroadcaster(protocol ProtocolState, fetcher func(func(*Session))) *Broad // Broadcast to sessions. The message only encoded once // so the performance is better than send message one by one. -func (b *Broadcaster) Broadcast(message Message) ([]BroadcastWork, error) { +func (b *Broadcaster) Broadcast(message Message, timeout time.Duration) ([]BroadcastWork, error) { buffer := newOutBuffer() b.protocol.PrepareOutBuffer(buffer, message.OutBufferSize()) if err := message.WriteOutBuffer(buffer); err != nil { @@ -37,7 +40,7 @@ func (b *Broadcaster) Broadcast(message Message) ([]BroadcastWork, error) { buffer.broadcastUse() works = append(works, BroadcastWork{ session, - session.asyncSendBuffer(buffer), + session.asyncSendBuffer(buffer, timeout), }) }) return works, nil @@ -71,8 +74,8 @@ func NewChannel(protocol Protocol, side ProtocolSide) *Channel { // Broadcast to channel. The message only encoded once // so the performance is better than send message one by one. -func (channel *Channel) Broadcast(message Message) ([]BroadcastWork, error) { - return channel.broadcaster.Broadcast(message) +func (channel *Channel) Broadcast(message Message, timeout time.Duration) ([]BroadcastWork, error) { + return channel.broadcaster.Broadcast(message, timeout) } // How mush sessions in this channel. diff --git a/examples/broadcast/main.go b/examples/broadcast/main.go index 41862c9..fdf8fb3 100644 --- a/examples/broadcast/main.go +++ b/examples/broadcast/main.go @@ -19,9 +19,9 @@ func main() { for { time.Sleep(time.Second * 2) // broadcast to server sessions - server.Broadcast(link.String("server say: " + time.Now().String())) + server.Broadcast(link.String("server say: "+time.Now().String()), 0) // broadcast to channel sessions - channel.Broadcast(link.String("channel say: " + time.Now().String())) + channel.Broadcast(link.String("channel say: "+time.Now().String()), 0) } }() @@ -33,8 +33,8 @@ func main() { session.Process(func(msg *link.InBuffer) error { channel.Broadcast(link.String( - "client " + session.Conn().RemoteAddr().String() + " say: " + string(msg.Data), - )) + "client "+session.Conn().RemoteAddr().String()+" say: "+string(msg.Data), + ), 0) return nil }) diff --git a/rpc/client.go b/rpc/client.go index 7d45eb5..58ce183 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -75,7 +75,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error buffer.WriteString(names[1]) buffer.WriteUint32LE(seqNum) return json.NewEncoder(buffer).Encode(args) - })) + }), 0) return <-c } diff --git a/server.go b/server.go index 3874456..eeb47dd 100644 --- a/server.go +++ b/server.go @@ -5,6 +5,7 @@ import ( "github.com/funny/sync" "net" "sync/atomic" + "time" ) // Errors @@ -76,8 +77,8 @@ func (server *Server) Protocol() Protocol { // Broadcast to channel. The message only encoded once // so the performance is better than send message one by one. -func (server *Server) Broadcast(message Message) ([]BroadcastWork, error) { - return server.broadcaster.Broadcast(message) +func (server *Server) Broadcast(message Message, timeout time.Duration) ([]BroadcastWork, error) { + return server.broadcaster.Broadcast(message, timeout) } // Accept incoming connection once. diff --git a/session.go b/session.go index 38388fd..b628361 100644 --- a/session.go +++ b/session.go @@ -46,9 +46,9 @@ type Session struct { sendMutex sync.Mutex asyncSendChan chan asyncMessage asyncSendBufferChan chan asyncBuffer - inBufferMutex sync.Mutex inBuffer *InBuffer outBuffer *OutBuffer + outBufferMutex sync.Mutex // About session close closeChan chan int @@ -137,8 +137,8 @@ func (session *Session) Close() { // Sync send a message. This method will block on IO. func (session *Session) Send(message Message) error { - session.inBufferMutex.Lock() - defer session.inBufferMutex.Unlock() + session.outBufferMutex.Lock() + defer session.outBufferMutex.Unlock() var err error @@ -226,7 +226,7 @@ func (session *Session) sendLoop() { } // Async send a message. -func (session *Session) AsyncSend(message Message) AsyncWork { +func (session *Session) AsyncSend(message Message, timeout time.Duration) AsyncWork { c := make(chan error, 1) if session.IsClosed() { c <- SendToClosedError @@ -234,23 +234,28 @@ func (session *Session) AsyncSend(message Message) AsyncWork { select { case session.asyncSendChan <- asyncMessage{c, message}: default: - go func() { - select { - case session.asyncSendChan <- asyncMessage{c, message}: - case <-session.closeChan: - c <- SendToClosedError - case <-time.After(time.Second * 5): - session.Close() - c <- AsyncSendTimeoutError - } - }() + if timeout == 0 { + session.Close() + c <- AsyncSendTimeoutError + } else { + go func() { + select { + case session.asyncSendChan <- asyncMessage{c, message}: + case <-session.closeChan: + c <- SendToClosedError + case <-time.After(time.Second * 5): + session.Close() + c <- AsyncSendTimeoutError + } + }() + } } } return AsyncWork{c} } // Async send a packet. -func (session *Session) asyncSendBuffer(buffer *OutBuffer) AsyncWork { +func (session *Session) asyncSendBuffer(buffer *OutBuffer, timeout time.Duration) AsyncWork { c := make(chan error, 1) if session.IsClosed() { c <- SendToClosedError @@ -258,16 +263,21 @@ func (session *Session) asyncSendBuffer(buffer *OutBuffer) AsyncWork { select { case session.asyncSendBufferChan <- asyncBuffer{c, buffer}: default: - go func() { - select { - case session.asyncSendBufferChan <- asyncBuffer{c, buffer}: - case <-session.closeChan: - c <- SendToClosedError - case <-time.After(time.Second * 5): - session.Close() - c <- AsyncSendTimeoutError - } - }() + if timeout == 0 { + session.Close() + c <- AsyncSendTimeoutError + } else { + go func() { + select { + case session.asyncSendBufferChan <- asyncBuffer{c, buffer}: + case <-session.closeChan: + c <- SendToClosedError + case <-time.After(time.Second * 5): + session.Close() + c <- AsyncSendTimeoutError + } + }() + } } } return AsyncWork{c}