Skip to content

Commit

Permalink
*: Update client-go and verify all read ts (#58934)
Browse files Browse the repository at this point in the history
ref #57786
  • Loading branch information
ekexium authored Jan 22, 2025
1 parent 6423868 commit 927aa59
Show file tree
Hide file tree
Showing 13 changed files with 24 additions and 18 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4179,8 +4179,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:E+JsLmxpa4rsT0rS9m52+FhiiRbQxZYn079T4JD12jU=",
version = "v2.0.8-0.20241204085508-80a6b021f0f6",
sum = "h1:zrgHZRnd7Veio1Q+jV9LVa/JAfBi2Y7fGx91s7Uw+fo=",
version = "v2.0.8-0.20250108151910-7b4409c11946",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4827,7 +4827,7 @@ func TestStaleReadAtFutureTime(t *testing.T) {

tk := testkit.NewTestKit(t, store)
// Setting tx_read_ts to a time in the future will fail. (One day before the 2038 problem)
tk.MustGetErrMsg("set @@tx_read_ts = '2038-01-18 03:14:07'", "cannot set read timestamp to a future time")
tk.MustContainErrMsg("set @@tx_read_ts = '2038-01-18 03:14:07'", "cannot set read timestamp to a future time")
// TxnReadTS Is not updated if check failed.
require.Zero(t, tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
}
Expand Down
3 changes: 2 additions & 1 deletion executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
if newSnapshotIsSet {
err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS)
isStaleRead := name == variable.TiDBTxnReadTS
err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS, isStaleRead)
if name != variable.TiDBTxnReadTS {
// Also check gc safe point for snapshot read.
// We don't check snapshot with gc safe point for read_ts
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20241204085508-80a6b021f0f6
github.com/tikv/client-go/v2 v2.0.8-0.20250108151910-7b4409c11946
github.com/tikv/pd/client v0.0.0-20240725070735-fb162bf0aa3f
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20241204085508-80a6b021f0f6 h1:E+JsLmxpa4rsT0rS9m52+FhiiRbQxZYn079T4JD12jU=
github.com/tikv/client-go/v2 v2.0.8-0.20241204085508-80a6b021f0f6/go.mod h1:45NuHB8x+VAoztMIjF6hEgXvPQXhXWPfMxDg0N8CoRY=
github.com/tikv/client-go/v2 v2.0.8-0.20250108151910-7b4409c11946 h1:zrgHZRnd7Veio1Q+jV9LVa/JAfBi2Y7fGx91s7Uw+fo=
github.com/tikv/client-go/v2 v2.0.8-0.20250108151910-7b4409c11946/go.mod h1:45NuHB8x+VAoztMIjF6hEgXvPQXhXWPfMxDg0N8CoRY=
github.com/tikv/pd/client v0.0.0-20240725070735-fb162bf0aa3f h1:Szw9YxqGGEneSniBd4ep09jgB77cKUy+AuhKOmdGPdE=
github.com/tikv/pd/client v0.0.0-20240725070735-fb162bf0aa3f/go.mod h1:QCBn54O5lhfkYfxj8Tyiqaxue/mthHEMyi7AqJP/+n4=
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo=
Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3568,7 +3568,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand All @@ -3582,7 +3582,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand Down
9 changes: 6 additions & 3 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ const (
LastExecuteDDL basicCtxType = 3
)

// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp
func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error {
return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp.
// For read requests to the storage, the check can be implicitly performed when sending the RPC request. So this
// function is only needed when it's not proper to delay the check to when RPC requests are being sent (e.g., `BEGIN`
// statements that don't make reading operation immediately).
func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64, isStaleRead bool) error {
return store.GetOracle().ValidateReadTS(ctx, readTS, isStaleRead, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
}

// SysProcTracker is used to track background sys processes
Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as
return 0, err
}

if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil {
if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts, true); err != nil {
return 0, err
}

Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/staleread/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context,
// If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that
// reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability).
// So in this case, do an extra check on it.
err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS)
err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS, true)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo)
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.store.store.GetOracle(), b.enableCollectExecutionInfo)
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo())
Expand Down
5 changes: 3 additions & 2 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,9 +57,9 @@ type RegionBatchRequestSender struct {
}

// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, oracle oracle.Oracle, enableCollectExecutionInfo bool) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client),
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client, oracle),
enableCollectExecutionInfo: enableCollectExecutionInfo,
}
}
Expand Down
2 changes: 1 addition & 1 deletion store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo)
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.store.store.GetOracle(), m.enableCollectExecutionInfo)
rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
Expand Down

0 comments on commit 927aa59

Please sign in to comment.