Skip to content

Commit

Permalink
Merge pull request #479 from TarsCloud/perf/lbbniu/transport
Browse files Browse the repository at this point in the history
perf: connection closed, try to reconnect once
  • Loading branch information
lbbniu authored Jun 16, 2023
2 parents 82d7d73 + 10d6e9e commit d0696c7
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions tars/transport/tarsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type connection struct {
client *TarsClient

conn net.Conn
connLock *sync.Mutex
connLock sync.Mutex

isClosed bool
idleTime time.Time
Expand All @@ -66,7 +66,7 @@ func NewTarsClient(address string, protocol ClientProtocol, config *TarsClientCo
sendQueue: make(chan sendMsg, config.QueueLen),
sendFailQueue: make(chan sendMsg, 1),
}
client.conn = &connection{client: client, isClosed: true, connLock: &sync.Mutex{}, dialTimeout: config.DialTimeout}
client.conn = &connection{client: client, isClosed: true, dialTimeout: config.DialTimeout}
return client
}

Expand Down Expand Up @@ -122,6 +122,35 @@ func (tc *TarsClient) GraceClose(ctx context.Context) {
}
}

func (c *connection) ReConnect() (err error) {
c.connLock.Lock()
defer c.connLock.Unlock()
if c.isClosed {
TLOG.Debug("Connect:", c.client.address, "Proto:", c.client.config.Proto)
if c.client.config.Proto == "ssl" {
dialer := &net.Dialer{Timeout: c.dialTimeout}
c.conn, err = tls.DialWithDialer(dialer, "tcp", c.client.address, c.client.config.TlsConfig)
} else {
c.conn, err = net.DialTimeout(c.client.config.Proto, c.client.address, c.dialTimeout)
}

if err != nil {
return err
}
if c.client.config.Proto == "tcp" {
if c.conn != nil {
_ = c.conn.(*net.TCPConn).SetKeepAlive(true)
}
}
c.idleTime = time.Now()
c.isClosed = false
connDone := make(chan bool, 1)
go c.recv(c.conn, connDone)
go c.send(c.conn, connDone)
}
return nil
}

func (c *connection) send(conn net.Conn, connDone chan bool) {
var m sendMsg
t := time.NewTicker(time.Second)
Expand Down Expand Up @@ -164,6 +193,14 @@ func (c *connection) send(conn net.Conn, connDone chan bool) {
TLOG.Errorf("send request retry: %d, error: %v", m.retry, err)
c.client.sendFailQueue <- m
c.close(conn)
if err != net.ErrClosed {
return
}

// connection closed, try to reconnect once
if err = c.ReConnect(); err != nil {
TLOG.Errorf("send request reconnect error: %v", err)
}
return
}
}
Expand Down Expand Up @@ -226,35 +263,6 @@ func (c *connection) recv(conn net.Conn, connDone chan bool) {
}
}

func (c *connection) ReConnect() (err error) {
c.connLock.Lock()
defer c.connLock.Unlock()
if c.isClosed {
TLOG.Debug("Connect:", c.client.address, "Proto:", c.client.config.Proto)
if c.client.config.Proto == "ssl" {
dialer := &net.Dialer{Timeout: c.dialTimeout}
c.conn, err = tls.DialWithDialer(dialer, "tcp", c.client.address, c.client.config.TlsConfig)
} else {
c.conn, err = net.DialTimeout(c.client.config.Proto, c.client.address, c.dialTimeout)
}

if err != nil {
return err
}
if c.client.config.Proto == "tcp" {
if c.conn != nil {
_ = c.conn.(*net.TCPConn).SetKeepAlive(true)
}
}
c.idleTime = time.Now()
c.isClosed = false
connDone := make(chan bool, 1)
go c.recv(c.conn, connDone)
go c.send(c.conn, connDone)
}
return nil
}

func (c *connection) close(conn net.Conn) {
c.connLock.Lock()
defer c.connLock.Unlock()
Expand Down

0 comments on commit d0696c7

Please sign in to comment.