Skip to content

Commit

Permalink
*: reduce overhead of codec client (#1555)
Browse files Browse the repository at this point in the history
 

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Jan 17, 2025
1 parent e9d868c commit 2076492
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 18 deletions.
9 changes: 1 addition & 8 deletions internal/apicodec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, erro
return nil, nil, errors.Errorf("unsupported api version %s", version.String())
}

func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request {
// Shallow copy the request to avoid concurrent modification.
r := *req

func setAPICtx(c Codec, r *tikvrpc.Request) {
r.Context.ApiVersion = c.GetAPIVersion()
r.Context.KeyspaceId = uint32(c.GetKeyspaceID())

Expand All @@ -114,8 +111,4 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request {
compact.ApiVersion = r.Context.ApiVersion
r.Req = &compact
}

tikvrpc.AttachContext(&r, r.Context)

return &r
}
19 changes: 15 additions & 4 deletions internal/apicodec/codec_v1.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package apicodec

import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
Expand All @@ -9,19 +11,24 @@ import (
)

type codecV1 struct {
reqPool sync.Pool
memCodec memCodec
}

// NewCodecV1 returns a codec that can be used to encode/decode
// keys and requests to and from APIv1 format.
func NewCodecV1(mode Mode) Codec {
var codec *codecV1
switch mode {
case ModeRaw:
return &codecV1{memCodec: &defaultMemCodec{}}
codec = &codecV1{memCodec: &defaultMemCodec{}}
case ModeTxn:
return &codecV1{memCodec: &memComparableCodec{}}
codec = &codecV1{memCodec: &memComparableCodec{}}
default:
panic("unknown mode")
}
panic("unknown mode")
codec.reqPool.New = func() any { return &tikvrpc.Request{} }
return codec
}

func (c *codecV1) GetAPIVersion() kvrpcpb.APIVersion {
Expand All @@ -37,10 +44,14 @@ func (c *codecV1) GetKeyspaceID() KeyspaceID {
}

func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
return attachAPICtx(c, req), nil
r := c.reqPool.Get().(*tikvrpc.Request)
*r = *req
setAPICtx(c, r)
return r, nil
}

func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
defer c.reqPool.Put(req)
regionError, err := resp.GetRegionError()
// If GetRegionError returns error, it means the response does not contain region error to decode,
// therefore we skip decoding and return the response as is.
Expand Down
10 changes: 8 additions & 2 deletions internal/apicodec/codec_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"sync"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -50,6 +51,7 @@ func BuildKeyspaceName(name string) string {

// codecV2 is used to encode/decode keys and request into APIv2 format.
type codecV2 struct {
reqPool sync.Pool
prefix []byte
endKey []byte
memCodec memCodec
Expand Down Expand Up @@ -85,6 +87,7 @@ func NewCodecV2(mode Mode, keyspaceMeta *keyspacepb.KeyspaceMeta) (Codec, error)
copy(codec.prefix[1:], prefix)
prefixVal := binary.BigEndian.Uint32(codec.prefix)
binary.BigEndian.PutUint32(codec.endKey, prefixVal+1)
codec.reqPool.New = func() any { return &tikvrpc.Request{} }
return codec, nil
}

Expand Down Expand Up @@ -122,8 +125,10 @@ func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion {
// EncodeRequest encodes with the given Codec.
// NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original.
func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
// attachAPICtx will shallow copy the request.
req = attachAPICtx(c, req)
r := c.reqPool.Get().(*tikvrpc.Request)
*r = *req
setAPICtx(c, r)
req = r
// Encode requests based on command type.
switch req.Type {
// Transaction Request Types.
Expand Down Expand Up @@ -289,6 +294,7 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error)

// DecodeResponse decode the resp with the given codec.
func (c *codecV2) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
defer c.reqPool.Put(req)
var err error
// Decode response based on command type.
switch req.Type {
Expand Down
2 changes: 2 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, res
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
tikvrpc.AttachContext(req, req.Context)

var spanRPC opentracing.Span
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
spanRPC = span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
Expand Down
2 changes: 1 addition & 1 deletion internal/client/client_collapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r reqCollapse) tryCollapseRequest(ctx context.Context, addr string, req *t
return
}
canCollapse = true
key := strconv.FormatUint(resolveLock.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
key := strconv.FormatUint(req.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
resp, err = r.collapse(ctx, key, &resolveRegionSf, addr, req, timeout)
return
default:
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,15 @@ func TestCollapseResolveLock(t *testing.T) {
CommitVersion: commitTS,
Keys: keys,
})
tikvrpc.SetContext(req, region, nil)
tikvrpc.SetContextNoAttach(req, region, nil)
return req
}
buildBatchResolveLockReq := func(regionID uint64, txnInfos []*kvrpcpb.TxnInfo) *tikvrpc.Request {
region := &metapb.Region{Id: regionID}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{
TxnInfos: txnInfos,
})
tikvrpc.SetContext(req, region, nil)
tikvrpc.SetContextNoAttach(req, region, nil)
return req
}

Expand Down
3 changes: 2 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,8 @@ func (s *RegionRequestSender) SendReqCtx(
if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType())
}
if err := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); err != nil {
// RPCClient.SendRequest will attach `req.Context` thus skip attaching here to reduce overhead.
if err := tikvrpc.SetContextNoAttach(req, rpcCtx.Meta, rpcCtx.Peer); err != nil {
return nil, nil, retryTimes, err
}
if s.replicaSelector != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (f *fnClient) CloseAddrVer(addr string, ver uint64) error {
func (f *fnClient) SetEventListener(listener client.ClientEventListener) {}

func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
tikvrpc.AttachContext(req, req.Context)
return f.fn(ctx, addr, req, timeout)
}

Expand Down
2 changes: 2 additions & 0 deletions internal/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,8 @@ func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*Session, error

// SendRequest sends a request to mock cluster.
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
tikvrpc.AttachContext(req, req.Context)

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("RPCClient.SendRequest", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down
2 changes: 2 additions & 0 deletions tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func (c *CodecClient) SendRequest(ctx context.Context, addr string, req *tikvrpc
if err != nil {
return nil, err
}
// TODO(zyguan): since unistore does not attach context yet, here we attach the context manually to make integration tests pass.
tikvrpc.AttachContext(req, req.Context)
resp, err := c.Client.SendRequest(ctx, addr, req, timeout)
if err != nil {
return nil, err
Expand Down
91 changes: 91 additions & 0 deletions tikvrpc/cmds_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions tikvrpc/gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,25 @@ cat <<EOF >> $output
return true
}
EOF

cat <<EOF >> $output
func isValidReqType(cmd CmdType) bool {
switch cmd {
EOF

for cmd in "${cmds[@]}"; do
cat <<EOF >> $output
case Cmd${cmd}:
return true
EOF
done

cat <<EOF >> $output
case CmdCopStream, CmdMPPTask, CmdMPPConn, CmdMPPCancel, CmdMPPAlive, CmdEmpty:
return true
default:
return false
}
}
EOF
15 changes: 15 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ func AttachContext(req *Request, rpcCtx kvrpcpb.Context) bool {
}

// SetContext set the Context field for the given req to the specified ctx.
//
// Deprecated: use SetContextNoAttach instead, RPCClient will call AttachContext(req, req.Context).
func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
if region != nil {
req.Context.RegionId = region.Id
Expand All @@ -818,6 +820,19 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
return nil
}

// SetContextNoAttach likes SetContext, but it doesn't attach the context to the underlying request.
func SetContextNoAttach(req *Request, region *metapb.Region, peer *metapb.Peer) error {
if !isValidReqType(req.Type) {
return errors.Errorf("invalid request type %v", req.Type)
}
if region != nil {
req.Context.RegionId = region.Id
req.Context.RegionEpoch = region.RegionEpoch
}
req.Context.Peer = peer
return nil
}

// GenRegionErrorResp returns corresponding Response with specified RegionError
// according to the given req.
func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
Expand Down

0 comments on commit 2076492

Please sign in to comment.