Skip to content

Commit

Permalink
Merge pull request #478 from TarsCloud/perf/lbbniu/transport
Browse files Browse the repository at this point in the history
perf(transport): optimize logging and normalize variable naming
  • Loading branch information
lbbniu authored Jun 4, 2023
2 parents dae512c + 04f0b47 commit 82d7d73
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 158 deletions.
5 changes: 3 additions & 2 deletions tars/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"sync/atomic"
"time"

"go.uber.org/automaxprocs/maxprocs"

"github.com/TarsCloud/TarsGo/tars/protocol"
"github.com/TarsCloud/TarsGo/tars/protocol/res/adminf"
"github.com/TarsCloud/TarsGo/tars/transport"
Expand All @@ -26,7 +28,6 @@ import (
"github.com/TarsCloud/TarsGo/tars/util/rogger"
"github.com/TarsCloud/TarsGo/tars/util/ssl"
"github.com/TarsCloud/TarsGo/tars/util/tools"
"go.uber.org/automaxprocs/maxprocs"
)

type destroyableImp interface {
Expand Down Expand Up @@ -73,7 +74,6 @@ var (
)

func init() {
_, _ = maxprocs.Set(maxprocs.Logger(TLOG.Infof))
rogger.SetLevel(rogger.ERROR)

defaultApp = newApp()
Expand Down Expand Up @@ -328,6 +328,7 @@ func (a *application) initConfig() {
a.clientObjTlsConfig[objName] = objTlsConfig
}
}
_, _ = maxprocs.Set(maxprocs.Logger(TLOG.Infof))
}

// Run the application
Expand Down
8 changes: 8 additions & 0 deletions tars/transport/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ const (
PackageError
)

// ServerHandler is interface with listen and handler method
type ServerHandler interface {
Listen() error
Handle() error
OnShutdown()
CloseIdles(n int64) bool
}

// ServerProtocol is interface for handling the server side tars package.
type ServerProtocol interface {
Invoke(ctx context.Context, pkg []byte) []byte
Expand Down
76 changes: 39 additions & 37 deletions tars/transport/tarsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ type TarsClient struct {
// TODO remove it
conn *connection

cp ClientProtocol
conf *TarsClientConf
protocol ClientProtocol
config *TarsClientConf
sendQueue chan sendMsg
sendFailQueue chan sendMsg
// recvQueue chan []byte
}

type sendMsg struct {
Expand All @@ -44,7 +43,7 @@ type sendMsg struct {
}

type connection struct {
tc *TarsClient
client *TarsClient

conn net.Conn
connLock *sync.Mutex
Expand All @@ -56,19 +55,19 @@ type connection struct {
}

// NewTarsClient new tars client and init it .
func NewTarsClient(address string, cp ClientProtocol, conf *TarsClientConf) *TarsClient {
if conf.QueueLen <= 0 {
conf.QueueLen = 100
func NewTarsClient(address string, protocol ClientProtocol, config *TarsClientConf) *TarsClient {
if config.QueueLen <= 0 {
config.QueueLen = 100
}
tc := &TarsClient{
conf: conf,
client := &TarsClient{
config: config,
address: address,
cp: cp,
sendQueue: make(chan sendMsg, conf.QueueLen),
protocol: protocol,
sendQueue: make(chan sendMsg, config.QueueLen),
sendFailQueue: make(chan sendMsg, 1),
}
tc.conn = &connection{tc: tc, isClosed: true, connLock: &sync.Mutex{}, dialTimeout: conf.DialTimeout}
return tc
client.conn = &connection{client: client, isClosed: true, connLock: &sync.Mutex{}, dialTimeout: config.DialTimeout}
return client
}

// ReConnect established the client connection with the server.
Expand All @@ -84,25 +83,24 @@ func (tc *TarsClient) Send(req []byte) error {

// avoid full sendQueue that cause sending block
var timerC <-chan struct{}
if tc.conf.WriteTimeout > 0 {
timerC = rtimer.After(tc.conf.WriteTimeout)
if tc.config.WriteTimeout > 0 {
timerC = rtimer.After(tc.config.WriteTimeout)
}

select {
case <-timerC:
return errors.New("tars client write timeout")
case tc.sendQueue <- sendMsg{req: req}:
return nil
}

return nil
}

// Close the client connection with the server.
func (tc *TarsClient) Close() {
w := tc.conn
if !w.isClosed && w.conn != nil {
w.isClosed = true
w.conn.Close()
_ = w.conn.Close()
}
}

Expand Down Expand Up @@ -136,33 +134,35 @@ func (c *connection) send(conn net.Conn, connDone chan bool) {
}
// get sendMsg
select {
case m = <-c.tc.sendFailQueue: // Send failure queue messages first
case m = <-c.client.sendFailQueue: // Send failure queue messages first
default:
select {
case m = <-c.tc.sendQueue: // Fetch jobs
case m = <-c.client.sendQueue: // Fetch jobs
case <-t.C:
if c.isClosed {
return
}
// TODO: check one-way invoke for idle detect
if c.invokeNum == 0 && c.idleTime.Add(c.tc.conf.IdleTimeout).Before(time.Now()) {
if c.invokeNum == 0 && c.idleTime.Add(c.client.config.IdleTimeout).Before(time.Now()) {
c.close(conn)
return
}
continue
}
}
atomic.AddInt32(&c.invokeNum, 1)
if c.tc.conf.WriteTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(c.tc.conf.WriteTimeout))
if c.client.config.WriteTimeout != 0 {
if err := conn.SetWriteDeadline(time.Now().Add(c.client.config.WriteTimeout)); err != nil {
TLOG.Errorf("set write deadline error: %v", err)
}
}
c.idleTime = time.Now()
_, err := conn.Write(m.req)
if err != nil {
// TODO add retry times
m.retry++
c.tc.sendFailQueue <- m
TLOG.Errorf("send request retry: %d, error: %v", m.retry, err)
c.client.sendFailQueue <- m
c.close(conn)
return
}
Expand All @@ -178,12 +178,14 @@ func (c *connection) recv(conn net.Conn, connDone chan bool) {
var n int
var err error
for {
if c.tc.conf.ReadTimeout != 0 {
conn.SetReadDeadline(time.Now().Add(c.tc.conf.ReadTimeout))
if c.client.config.ReadTimeout != 0 {
if err = conn.SetReadDeadline(time.Now().Add(c.client.config.ReadTimeout)); err != nil {
TLOG.Errorf("set read deadline error: %v", err)
}
}
n, err = conn.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() {
if isNoDataError(err) {
continue // no data, not error
}
if _, ok := err.(*net.OpError); ok {
Expand All @@ -194,14 +196,14 @@ func (c *connection) recv(conn net.Conn, connDone chan bool) {
if err == io.EOF {
TLOG.Debugf("connection closed by remote: %v, error: %v", conn.RemoteAddr(), err)
} else {
TLOG.Error("read package error:", err)
TLOG.Errorf("read package error: %v", err)
}
c.close(conn)
return
}
currBuffer = append(currBuffer, buffer[:n]...)
for {
pkgLen, status := c.tc.cp.ParsePackage(currBuffer)
pkgLen, status := c.client.protocol.ParsePackage(currBuffer)
if status == PackageLess {
break
}
Expand All @@ -210,7 +212,7 @@ func (c *connection) recv(conn net.Conn, connDone chan bool) {
pkg := make([]byte, pkgLen)
copy(pkg, currBuffer[0:pkgLen])
currBuffer = currBuffer[pkgLen:]
go c.tc.cp.Recv(pkg)
go c.client.protocol.Recv(pkg)
if len(currBuffer) > 0 {
continue
}
Expand All @@ -228,20 +230,20 @@ func (c *connection) ReConnect() (err error) {
c.connLock.Lock()
defer c.connLock.Unlock()
if c.isClosed {
TLOG.Debug("Connect:", c.tc.address, "Proto:", c.tc.conf.Proto)
if c.tc.conf.Proto == "ssl" {
TLOG.Debug("Connect:", c.client.address, "Proto:", c.client.config.Proto)
if c.client.config.Proto == "ssl" {
dialer := &net.Dialer{Timeout: c.dialTimeout}
c.conn, err = tls.DialWithDialer(dialer, "tcp", c.tc.address, c.tc.conf.TlsConfig)
c.conn, err = tls.DialWithDialer(dialer, "tcp", c.client.address, c.client.config.TlsConfig)
} else {
c.conn, err = net.DialTimeout(c.tc.conf.Proto, c.tc.address, c.dialTimeout)
c.conn, err = net.DialTimeout(c.client.config.Proto, c.client.address, c.dialTimeout)
}

if err != nil {
return err
}
if c.tc.conf.Proto == "tcp" {
if c.client.config.Proto == "tcp" {
if c.conn != nil {
c.conn.(*net.TCPConn).SetKeepAlive(true)
_ = c.conn.(*net.TCPConn).SetKeepAlive(true)
}
}
c.idleTime = time.Now()
Expand All @@ -258,6 +260,6 @@ func (c *connection) close(conn net.Conn) {
defer c.connLock.Unlock()
c.isClosed = true
if conn != nil {
conn.Close()
_ = conn.Close()
}
}
38 changes: 15 additions & 23 deletions tars/transport/tarsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ import (
// TLOG is logger for transport.
var TLOG = rogger.GetLogger("TLOG")

// ServerHandler is interface with listen and handler method
type ServerHandler interface {
Listen() error
Handle() error
OnShutdown()
CloseIdles(n int64) bool
}

// TarsServerConf server config for tars server side.
type TarsServerConf struct {
Proto string
Expand All @@ -39,30 +31,30 @@ type TarsServerConf struct {

// TarsServer tars server struct.
type TarsServer struct {
svr ServerProtocol
conf *TarsServerConf
protocol ServerProtocol
config *TarsServerConf
handle ServerHandler
lastInvoke time.Time
isClosed int32
numInvoke int32
numConn int32
}

// NewTarsServer new TarsServer and init with conf.
func NewTarsServer(svr ServerProtocol, conf *TarsServerConf) *TarsServer {
ts := &TarsServer{svr: svr, conf: conf}
// NewTarsServer new TarsServer and init with config.
func NewTarsServer(protocol ServerProtocol, config *TarsServerConf) *TarsServer {
ts := &TarsServer{protocol: protocol, config: config}
ts.isClosed = 0
ts.lastInvoke = time.Now()
return ts
}

func (ts *TarsServer) getHandler() (sh ServerHandler) {
if ts.conf.Proto == "tcp" {
sh = &tcpHandler{conf: ts.conf, ts: ts}
} else if ts.conf.Proto == "udp" {
sh = &udpHandler{conf: ts.conf, ts: ts}
if ts.config.Proto == "tcp" {
sh = &tcpHandler{config: ts.config, server: ts}
} else if ts.config.Proto == "udp" {
sh = &udpHandler{config: ts.config, server: ts}
} else {
panic("unsupport protocol: " + ts.conf.Proto)
panic("unsupport protocol: " + ts.config.Proto)
}
return
}
Expand Down Expand Up @@ -105,7 +97,7 @@ func (ts *TarsServer) Shutdown(ctx context.Context) error {

// GetConfig gets the tars server config.
func (ts *TarsServer) GetConfig() *TarsServerConf {
return ts.conf
return ts.config
}

// IsZombie show whether the server is hanged by the request.
Expand All @@ -115,19 +107,19 @@ func (ts *TarsServer) IsZombie(timeout time.Duration) bool {
}

func (ts *TarsServer) invoke(ctx context.Context, pkg []byte) []byte {
cfg := ts.conf
cfg := ts.config
var rsp []byte
if cfg.HandleTimeout == 0 {
rsp = ts.svr.Invoke(ctx, pkg)
rsp = ts.protocol.Invoke(ctx, pkg)
} else {
invokeDone, cancelFunc := context.WithTimeout(context.Background(), cfg.HandleTimeout)
go func() {
rsp = ts.svr.Invoke(ctx, pkg)
rsp = ts.protocol.Invoke(ctx, pkg)
cancelFunc()
}()
<-invokeDone.Done()
if len(rsp) == 0 { // The rsp must be none-empty
rsp = ts.svr.InvokeTimeout(pkg)
rsp = ts.protocol.InvokeTimeout(pkg)
}
}
return rsp
Expand Down
Loading

0 comments on commit 82d7d73

Please sign in to comment.