From eed6cc4e1580a2328fcf7dc8d9850555ea041ed8 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:08:12 +0800 Subject: [PATCH 1/2] *: Update client-go and verify all read ts (#58054) ref pingcap/tidb#57786 --- DEPS.bzl | 4 +-- executor/set.go | 3 +- go.mod | 2 +- go.sum | 4 +-- planner/core/planbuilder.go | 4 +-- sessionctx/context.go | 9 +++-- sessiontxn/staleread/processor.go | 2 +- sessiontxn/staleread/util.go | 2 +- store/copr/BUILD.bazel | 1 + store/copr/batch_coprocessor.go | 2 +- store/copr/batch_request_sender.go | 5 +-- store/copr/mpp.go | 2 +- util/mock/BUILD.bazel | 1 + util/mock/context.go | 57 ++++++++++++++++++++++++++---- 14 files changed, 74 insertions(+), 24 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 322bc9d8025ce..d05822f7300ee 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3603,8 +3603,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:P6bhZG2yFFuKYvOpfltUbt89sbHohq4BAv2P4GB3fL8=", - version = "v2.0.4-0.20250109055446-ccec7efbf0f7", + sum = "h1:jP8ndFLhBYYOLLmj5JDn3uWkilpeDt2NwDQBTXGZFcM=", + version = "v2.0.4-0.20250116080755-ec354dcd0e2b", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/executor/set.go b/executor/set.go index da6e1a58198b7..7603f6bf29040 100644 --- a/executor/set.go +++ b/executor/set.go @@ -197,7 +197,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS) + isStaleRead := name == variable.TiDBTxnReadTS + err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS, isStaleRead) if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts diff --git a/go.mod b/go.mod index b891b04dabb67..51e56bc3007a2 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7 + github.com/tikv/client-go/v2 v2.0.4-0.20250116080755-ec354dcd0e2b github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index a8833b4242b28..07f09af7738a4 100644 --- a/go.sum +++ b/go.sum @@ -948,8 +948,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7 h1:P6bhZG2yFFuKYvOpfltUbt89sbHohq4BAv2P4GB3fL8= -github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ= +github.com/tikv/client-go/v2 v2.0.4-0.20250116080755-ec354dcd0e2b h1:jP8ndFLhBYYOLLmj5JDn3uWkilpeDt2NwDQBTXGZFcM= +github.com/tikv/client-go/v2 v2.0.4-0.20250116080755-ec354dcd0e2b/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ= github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM= github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index ea3417f664d5e..5c97007f47de5 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3399,7 +3399,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3413,7 +3413,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/sessionctx/context.go b/sessionctx/context.go index 281c3f8d24a19..43548dc3bed5f 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -219,9 +219,12 @@ const ( LastExecuteDDL basicCtxType = 3 ) -// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { - return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) +// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp. +// For read requests to the storage, the check can be implicitly performed when sending the RPC request. So this +// function is only needed when it's not proper to delay the check to when RPC requests are being sent (e.g., `BEGIN` +// statements that don't make reading operation immediately). +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64, isStaleRead bool) error { + return store.GetOracle().ValidateReadTS(ctx, readTS, isStaleRead, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } // SysProcTracker is used to track background sys processes diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 278d1b158a599..3423978972369 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -285,7 +285,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts, true); err != nil { return 0, err } diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index 30a446bbb1817..d9d8ebef61be3 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -84,7 +84,7 @@ func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). // So in this case, do an extra check on it. - err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS, true) if err != nil { return 0, err } diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index 1d3238f13e93a..2ef2e7ecdf145 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//txnkv/txnlock", diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 6f8923344fe9c..5aa6631a82682 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -869,7 +869,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash. func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.store.store.GetOracle(), b.enableCollectExecutionInfo) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo()) diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index b976d26a59ab3..3552be8d3516b 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "google.golang.org/grpc/codes" @@ -55,9 +56,9 @@ type RegionBatchRequestSender struct { } // NewRegionBatchRequestSender creates a RegionBatchRequestSender object. -func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender { +func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, oracle oracle.Oracle, enableCollectExecutionInfo bool) *RegionBatchRequestSender { return &RegionBatchRequestSender{ - RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client), + RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client, oracle), enableCollectExecutionInfo: enableCollectExecutionInfo, } } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index ca1976885a003..0c3a0b011a677 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -266,7 +266,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { - sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.store.store.GetOracle(), m.enableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. diff --git a/util/mock/BUILD.bazel b/util/mock/BUILD.bazel index 78ba5291cbe8b..98385571c38da 100644 --- a/util/mock/BUILD.bazel +++ b/util/mock/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//sessionctx/variable", "//util", "//util/disk", + "//util/logutil", "//util/memory", "//util/sli", "//util/sqlexec", diff --git a/util/mock/context.go b/util/mock/context.go index 3445e73d4b603..75d702c67b10a 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -67,7 +68,7 @@ type wrapTxn struct { } func (txn *wrapTxn) validOrPending() bool { - return txn.tsFuture != nil || txn.Transaction.Valid() + return txn.tsFuture != nil || (txn.Transaction != nil && txn.Transaction.Valid()) } func (txn *wrapTxn) pending() bool { @@ -173,7 +174,15 @@ func (c *Context) GetSessionVars() *variable.SessionVars { } // Txn implements sessionctx.Context Txn interface. -func (c *Context) Txn(bool) (kv.Transaction, error) { +func (c *Context) Txn(active bool) (kv.Transaction, error) { + if active { + if !c.txn.validOrPending() { + err := c.newTxn(context.Background()) + if err != nil { + return nil, err + } + } + } return &c.txn, nil } @@ -253,10 +262,12 @@ func (c *Context) GetPlanCache(_ bool) sessionctx.PlanCache { return c.pcache } -// NewTxn implements the sessionctx.Context interface. -func (c *Context) NewTxn(context.Context) error { +// newTxn Creates new transaction on the session context. +func (c *Context) newTxn(ctx context.Context) error { if c.Store == nil { - return errors.New("store is not set") + logutil.Logger(ctx).Warn("mock.Context: No store is specified when trying to create new transaction. A fake transaction will be created. Note that this is unrecommended usage.") + c.fakeTxn() + return nil } if c.txn.Valid() { err := c.txn.Commit(c.ctx) @@ -273,14 +284,46 @@ func (c *Context) NewTxn(context.Context) error { return nil } +// fakeTxn is used to let some tests pass in the context without an available kv.Storage. Once usages to access +// transactions without a kv.Storage are removed, this type should also be removed. +// New code should never use this. +type fakeTxn struct { + // The inner should always be nil. + kv.Transaction + startTS uint64 +} + +func (t *fakeTxn) StartTS() uint64 { + return t.startTS +} + +func (*fakeTxn) SetDiskFullOpt(_ kvrpcpb.DiskFullOpt) {} + +func (*fakeTxn) SetOption(_ int, _ any) {} + +func (*fakeTxn) Get(ctx context.Context, _ kv.Key) ([]byte, error) { + // Check your implementation if you meet this error. It's dangerous if some calculation relies on the data but the + // read result is faked. + logutil.Logger(ctx).Warn("mock.Context: No store is specified but trying to access data from a transaction.") + return nil, nil +} + +func (*fakeTxn) Valid() bool { return true } + +func (c *Context) fakeTxn() { + c.txn.Transaction = &fakeTxn{ + startTS: 1, + } +} + // NewStaleTxnWithStartTS implements the sessionctx.Context interface. func (c *Context) NewStaleTxnWithStartTS(ctx context.Context, _ uint64) error { - return c.NewTxn(ctx) + return c.newTxn(ctx) } // RefreshTxnCtx implements the sessionctx.Context interface. func (c *Context) RefreshTxnCtx(ctx context.Context) error { - return errors.Trace(c.NewTxn(ctx)) + return errors.Trace(c.newTxn(ctx)) } // RefreshVars implements the sessionctx.Context interface. From a5aeccec4af6109aafb7f0547ff263a953d5a78b Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 22 Jan 2025 14:55:09 +0800 Subject: [PATCH 2/2] try to revert the change in mock Signed-off-by: ekexium --- util/mock/BUILD.bazel | 1 - util/mock/context.go | 57 ++++++------------------------------------- 2 files changed, 7 insertions(+), 51 deletions(-) diff --git a/util/mock/BUILD.bazel b/util/mock/BUILD.bazel index 98385571c38da..78ba5291cbe8b 100644 --- a/util/mock/BUILD.bazel +++ b/util/mock/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//sessionctx/variable", "//util", "//util/disk", - "//util/logutil", "//util/memory", "//util/sli", "//util/sqlexec", diff --git a/util/mock/context.go b/util/mock/context.go index 75d702c67b10a..3445e73d4b603 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -68,7 +67,7 @@ type wrapTxn struct { } func (txn *wrapTxn) validOrPending() bool { - return txn.tsFuture != nil || (txn.Transaction != nil && txn.Transaction.Valid()) + return txn.tsFuture != nil || txn.Transaction.Valid() } func (txn *wrapTxn) pending() bool { @@ -174,15 +173,7 @@ func (c *Context) GetSessionVars() *variable.SessionVars { } // Txn implements sessionctx.Context Txn interface. -func (c *Context) Txn(active bool) (kv.Transaction, error) { - if active { - if !c.txn.validOrPending() { - err := c.newTxn(context.Background()) - if err != nil { - return nil, err - } - } - } +func (c *Context) Txn(bool) (kv.Transaction, error) { return &c.txn, nil } @@ -262,12 +253,10 @@ func (c *Context) GetPlanCache(_ bool) sessionctx.PlanCache { return c.pcache } -// newTxn Creates new transaction on the session context. -func (c *Context) newTxn(ctx context.Context) error { +// NewTxn implements the sessionctx.Context interface. +func (c *Context) NewTxn(context.Context) error { if c.Store == nil { - logutil.Logger(ctx).Warn("mock.Context: No store is specified when trying to create new transaction. A fake transaction will be created. Note that this is unrecommended usage.") - c.fakeTxn() - return nil + return errors.New("store is not set") } if c.txn.Valid() { err := c.txn.Commit(c.ctx) @@ -284,46 +273,14 @@ func (c *Context) newTxn(ctx context.Context) error { return nil } -// fakeTxn is used to let some tests pass in the context without an available kv.Storage. Once usages to access -// transactions without a kv.Storage are removed, this type should also be removed. -// New code should never use this. -type fakeTxn struct { - // The inner should always be nil. - kv.Transaction - startTS uint64 -} - -func (t *fakeTxn) StartTS() uint64 { - return t.startTS -} - -func (*fakeTxn) SetDiskFullOpt(_ kvrpcpb.DiskFullOpt) {} - -func (*fakeTxn) SetOption(_ int, _ any) {} - -func (*fakeTxn) Get(ctx context.Context, _ kv.Key) ([]byte, error) { - // Check your implementation if you meet this error. It's dangerous if some calculation relies on the data but the - // read result is faked. - logutil.Logger(ctx).Warn("mock.Context: No store is specified but trying to access data from a transaction.") - return nil, nil -} - -func (*fakeTxn) Valid() bool { return true } - -func (c *Context) fakeTxn() { - c.txn.Transaction = &fakeTxn{ - startTS: 1, - } -} - // NewStaleTxnWithStartTS implements the sessionctx.Context interface. func (c *Context) NewStaleTxnWithStartTS(ctx context.Context, _ uint64) error { - return c.newTxn(ctx) + return c.NewTxn(ctx) } // RefreshTxnCtx implements the sessionctx.Context interface. func (c *Context) RefreshTxnCtx(ctx context.Context) error { - return errors.Trace(c.newTxn(ctx)) + return errors.Trace(c.NewTxn(ctx)) } // RefreshVars implements the sessionctx.Context interface.