From 566c63d91552fdba960df0f48668050009065cfb Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 16 Dec 2024 15:49:23 +0800 Subject: [PATCH] Fix the problem that ttlManager may stop working if no need to lock after retry aggressive locking Signed-off-by: MyonKeminta --- integration_tests/2pc_test.go | 76 +++++++++++++++++++++++++++++++++ txnkv/transaction/test_probe.go | 5 +++ txnkv/transaction/txn.go | 10 ++++- 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index fec8cfb6e5..305d6fb310 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1520,6 +1520,82 @@ func (s *testCommitterSuite) TestAggressiveLockingExitIfInapplicable() { s.NoError(txn.Rollback()) } +func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() { + // Not blocked + txn := s.begin() + txn.SetPessimistic(true) + txn.StartAggressiveLocking() + s.True(txn.IsInAggressiveLockingMode()) + s.True(txn.GetCommitter().IsNil()) + + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.True(txn.GetCommitter().IsTTLRunning()) + + txn.CancelAggressiveLocking(context.Background()) + s.False(txn.IsInAggressiveLockingMode()) + s.False(txn.GetCommitter().IsTTLRunning()) + + // End the transaction to test the next case. + s.NoError(txn.Rollback()) + + // txn blocked by txn2 + txn = s.begin() + txn.SetPessimistic(true) + txn.StartAggressiveLocking() + s.True(txn.IsInAggressiveLockingMode()) + s.True(txn.GetCommitter().IsNil()) + + txn2 := s.begin() + txn2.SetPessimistic(true) + lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn2.LockKeys(context.Background(), lockCtx2, []byte("k1"))) + + lockResCh := make(chan error) + go func() { + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + lockResCh <- txn.LockKeys(context.Background(), lockCtx, []byte("k1")) + }() + // No immediate result as blocked by txn2 + select { + case <-time.After(time.Millisecond * 100): + case err := <-lockResCh: + s.FailNowf("get lock result when expected to be blocked", "error: %+v", err) + } + + s.NoError(txn2.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn2.Commit(context.Background())) + + // txn is resumed + select { + case <-time.After(time.Second): + s.FailNow("txn not resumed after blocker is committed") + case err := <-lockResCh: + s.NoError(err) + } + + s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Greater(lockCtx.MaxLockedWithConflictTS, txn.StartTS()) + + s.True(txn.GetCommitter().IsTTLRunning()) + + txn.RetryAggressiveLocking(context.Background()) + s.True(txn.GetCommitter().IsTTLRunning()) + + // Get a new ts as the new forUpdateTS. + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + s.NoError(err) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.True(txn.GetCommitter().IsTTLRunning()) + + txn.CancelAggressiveLocking(context.Background()) + s.True(txn.GetCommitter().IsTTLRunning()) + + // End the test. + s.NoError(txn.Rollback()) +} + // TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time. func (s *testCommitterSuite) TestElapsedTTL() { key := []byte("key") diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 7e0a582e01..d3b049f9cf 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -152,6 +152,11 @@ type CommitterProbe struct { *twoPhaseCommitter } +// IsNil returns whether tie internal twoPhaseCommitter is nil. +func (c CommitterProbe) IsNil() bool { + return c.twoPhaseCommitter == nil +} + // InitKeysAndMutations prepares the committer for commit. func (c CommitterProbe) InitKeysAndMutations() error { return c.initKeysAndMutations(context.Background()) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index aeebceb1c9..6f83c41258 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1006,8 +1006,8 @@ func (txn *KVTxn) RetryAggressiveLocking(ctx context.Context) { } txn.cleanupAggressiveLockingRedundantLocks(ctx) if txn.aggressiveLockingContext.assignedPrimaryKey { - txn.resetPrimary() txn.aggressiveLockingContext.assignedPrimaryKey = false + txn.aggressiveLockingContext.lastAssignedPrimaryKey = true } txn.aggressiveLockingContext.lastPrimaryKey = txn.aggressiveLockingContext.primaryKey @@ -1401,10 +1401,17 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() metrics.AggressiveLockedKeysNew.Add(float64(len(keys))) if len(keys) == 0 { + // Assume the primary key is not changed in this case. Keep the ttlManager running. + // Ref: https://github.com/pingcap/tidb/issues/58279 if lockCtx.Stats != nil { txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode) } return nil + } else if !bytes.Equal(txn.aggressiveLockingContext.primaryKey, txn.aggressiveLockingContext.lastPrimaryKey) { + // If the primary key is changed, we need the ttlManager to be restarted to run on the new primary key. + // Otherwise, we do nothing but keep it running. When it sends the RPC requests, ttlManager.run() + // will be called, but it's reentrant and will do nothing as the ttlManager is already running. + txn.committer.ttlManager.reset() } } @@ -1595,6 +1602,7 @@ func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) { type aggressiveLockingContext struct { lastRetryUnnecessaryLocks map[string]tempLockBufferEntry lastPrimaryKey []byte + lastAssignedPrimaryKey bool lastAttemptStartTime time.Time currentLockedKeys map[string]tempLockBufferEntry