Skip to content

Commit

Permalink
Fix the problem that ttlManager may stop working if no need to lock a…
Browse files Browse the repository at this point in the history
…fter retry aggressive locking

Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Dec 16, 2024
1 parent 0e4728c commit 566c63d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 1 deletion.
76 changes: 76 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,82 @@ func (s *testCommitterSuite) TestAggressiveLockingExitIfInapplicable() {
s.NoError(txn.Rollback())
}

func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
// Not blocked
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.IsInAggressiveLockingMode())
s.False(txn.GetCommitter().IsTTLRunning())

// End the transaction to test the next case.
s.NoError(txn.Rollback())

// txn blocked by txn2
txn = s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

txn2 := s.begin()
txn2.SetPessimistic(true)
lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn2.LockKeys(context.Background(), lockCtx2, []byte("k1")))

lockResCh := make(chan error)
go func() {
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockResCh <- txn.LockKeys(context.Background(), lockCtx, []byte("k1"))
}()
// No immediate result as blocked by txn2
select {
case <-time.After(time.Millisecond * 100):
case err := <-lockResCh:
s.FailNowf("get lock result when expected to be blocked", "error: %+v", err)
}

s.NoError(txn2.Set([]byte("k1"), []byte("v1")))
s.NoError(txn2.Commit(context.Background()))

// txn is resumed
select {
case <-time.After(time.Second):
s.FailNow("txn not resumed after blocker is committed")
case err := <-lockResCh:
s.NoError(err)
}

s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Greater(lockCtx.MaxLockedWithConflictTS, txn.StartTS())

s.True(txn.GetCommitter().IsTTLRunning())

txn.RetryAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())

// End the test.
s.NoError(txn.Rollback())
}

// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
Expand Down
5 changes: 5 additions & 0 deletions txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type CommitterProbe struct {
*twoPhaseCommitter
}

// IsNil returns whether tie internal twoPhaseCommitter is nil.
func (c CommitterProbe) IsNil() bool {
return c.twoPhaseCommitter == nil
}

// InitKeysAndMutations prepares the committer for commit.
func (c CommitterProbe) InitKeysAndMutations() error {
return c.initKeysAndMutations(context.Background())
Expand Down
10 changes: 9 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,8 @@ func (txn *KVTxn) RetryAggressiveLocking(ctx context.Context) {
}
txn.cleanupAggressiveLockingRedundantLocks(ctx)
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
txn.aggressiveLockingContext.assignedPrimaryKey = false
txn.aggressiveLockingContext.lastAssignedPrimaryKey = true
}

txn.aggressiveLockingContext.lastPrimaryKey = txn.aggressiveLockingContext.primaryKey
Expand Down Expand Up @@ -1401,10 +1401,17 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
metrics.AggressiveLockedKeysNew.Add(float64(len(keys)))

if len(keys) == 0 {
// Assume the primary key is not changed in this case. Keep the ttlManager running.
// Ref: https://github.com/pingcap/tidb/issues/58279
if lockCtx.Stats != nil {
txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode)
}
return nil
} else if !bytes.Equal(txn.aggressiveLockingContext.primaryKey, txn.aggressiveLockingContext.lastPrimaryKey) {
// If the primary key is changed, we need the ttlManager to be restarted to run on the new primary key.
// Otherwise, we do nothing but keep it running. When it sends the RPC requests, ttlManager.run()
// will be called, but it's reentrant and will do nothing as the ttlManager is already running.
txn.committer.ttlManager.reset()
}
}

Expand Down Expand Up @@ -1595,6 +1602,7 @@ func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
type aggressiveLockingContext struct {
lastRetryUnnecessaryLocks map[string]tempLockBufferEntry
lastPrimaryKey []byte
lastAssignedPrimaryKey bool
lastAttemptStartTime time.Time

currentLockedKeys map[string]tempLockBufferEntry
Expand Down

0 comments on commit 566c63d

Please sign in to comment.