Skip to content

Commit

Permalink
add timeout argument to AsyncSend(); fix variable name bug;
Browse files Browse the repository at this point in the history
  • Loading branch information
bg5sbk committed Feb 6, 2015
1 parent 795cd4f commit 59ccbcc
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 37 deletions.
13 changes: 8 additions & 5 deletions broadcast.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package link

import "github.com/funny/sync"
import (
"github.com/funny/sync"
"time"
)

// Broadcaster.
type Broadcaster struct {
Expand All @@ -24,7 +27,7 @@ func NewBroadcaster(protocol ProtocolState, fetcher func(func(*Session))) *Broad

// Broadcast to sessions. The message only encoded once
// so the performance is better than send message one by one.
func (b *Broadcaster) Broadcast(message Message) ([]BroadcastWork, error) {
func (b *Broadcaster) Broadcast(message Message, timeout time.Duration) ([]BroadcastWork, error) {
buffer := newOutBuffer()
b.protocol.PrepareOutBuffer(buffer, message.OutBufferSize())
if err := message.WriteOutBuffer(buffer); err != nil {
Expand All @@ -37,7 +40,7 @@ func (b *Broadcaster) Broadcast(message Message) ([]BroadcastWork, error) {
buffer.broadcastUse()
works = append(works, BroadcastWork{
session,
session.asyncSendBuffer(buffer),
session.asyncSendBuffer(buffer, timeout),
})
})
return works, nil
Expand Down Expand Up @@ -71,8 +74,8 @@ func NewChannel(protocol Protocol, side ProtocolSide) *Channel {

// Broadcast to channel. The message only encoded once
// so the performance is better than send message one by one.
func (channel *Channel) Broadcast(message Message) ([]BroadcastWork, error) {
return channel.broadcaster.Broadcast(message)
func (channel *Channel) Broadcast(message Message, timeout time.Duration) ([]BroadcastWork, error) {
return channel.broadcaster.Broadcast(message, timeout)
}

// How mush sessions in this channel.
Expand Down
8 changes: 4 additions & 4 deletions examples/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func main() {
for {
time.Sleep(time.Second * 2)
// broadcast to server sessions
server.Broadcast(link.String("server say: " + time.Now().String()))
server.Broadcast(link.String("server say: "+time.Now().String()), 0)
// broadcast to channel sessions
channel.Broadcast(link.String("channel say: " + time.Now().String()))
channel.Broadcast(link.String("channel say: "+time.Now().String()), 0)
}
}()

Expand All @@ -33,8 +33,8 @@ func main() {

session.Process(func(msg *link.InBuffer) error {
channel.Broadcast(link.String(
"client " + session.Conn().RemoteAddr().String() + " say: " + string(msg.Data),
))
"client "+session.Conn().RemoteAddr().String()+" say: "+string(msg.Data),
), 0)
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
buffer.WriteString(names[1])
buffer.WriteUint32LE(seqNum)
return json.NewEncoder(buffer).Encode(args)
}))
}), 0)

return <-c
}
Expand Down
5 changes: 3 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/funny/sync"
"net"
"sync/atomic"
"time"
)

// Errors
Expand Down Expand Up @@ -76,8 +77,8 @@ func (server *Server) Protocol() Protocol {

// Broadcast to channel. The message only encoded once
// so the performance is better than send message one by one.
func (server *Server) Broadcast(message Message) ([]BroadcastWork, error) {
return server.broadcaster.Broadcast(message)
func (server *Server) Broadcast(message Message, timeout time.Duration) ([]BroadcastWork, error) {
return server.broadcaster.Broadcast(message, timeout)
}

// Accept incoming connection once.
Expand Down
60 changes: 35 additions & 25 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ type Session struct {
sendMutex sync.Mutex
asyncSendChan chan asyncMessage
asyncSendBufferChan chan asyncBuffer
inBufferMutex sync.Mutex
inBuffer *InBuffer
outBuffer *OutBuffer
outBufferMutex sync.Mutex

// About session close
closeChan chan int
Expand Down Expand Up @@ -137,8 +137,8 @@ func (session *Session) Close() {

// Sync send a message. This method will block on IO.
func (session *Session) Send(message Message) error {
session.inBufferMutex.Lock()
defer session.inBufferMutex.Unlock()
session.outBufferMutex.Lock()
defer session.outBufferMutex.Unlock()

var err error

Expand Down Expand Up @@ -226,48 +226,58 @@ func (session *Session) sendLoop() {
}

// Async send a message.
func (session *Session) AsyncSend(message Message) AsyncWork {
func (session *Session) AsyncSend(message Message, timeout time.Duration) AsyncWork {
c := make(chan error, 1)
if session.IsClosed() {
c <- SendToClosedError
} else {
select {
case session.asyncSendChan <- asyncMessage{c, message}:
default:
go func() {
select {
case session.asyncSendChan <- asyncMessage{c, message}:
case <-session.closeChan:
c <- SendToClosedError
case <-time.After(time.Second * 5):
session.Close()
c <- AsyncSendTimeoutError
}
}()
if timeout == 0 {
session.Close()
c <- AsyncSendTimeoutError
} else {
go func() {
select {
case session.asyncSendChan <- asyncMessage{c, message}:
case <-session.closeChan:
c <- SendToClosedError
case <-time.After(time.Second * 5):
session.Close()
c <- AsyncSendTimeoutError
}
}()
}
}
}
return AsyncWork{c}
}

// Async send a packet.
func (session *Session) asyncSendBuffer(buffer *OutBuffer) AsyncWork {
func (session *Session) asyncSendBuffer(buffer *OutBuffer, timeout time.Duration) AsyncWork {
c := make(chan error, 1)
if session.IsClosed() {
c <- SendToClosedError
} else {
select {
case session.asyncSendBufferChan <- asyncBuffer{c, buffer}:
default:
go func() {
select {
case session.asyncSendBufferChan <- asyncBuffer{c, buffer}:
case <-session.closeChan:
c <- SendToClosedError
case <-time.After(time.Second * 5):
session.Close()
c <- AsyncSendTimeoutError
}
}()
if timeout == 0 {
session.Close()
c <- AsyncSendTimeoutError
} else {
go func() {
select {
case session.asyncSendBufferChan <- asyncBuffer{c, buffer}:
case <-session.closeChan:
c <- SendToClosedError
case <-time.After(time.Second * 5):
session.Close()
c <- AsyncSendTimeoutError
}
}()
}
}
}
return AsyncWork{c}
Expand Down

0 comments on commit 59ccbcc

Please sign in to comment.