Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl notifier: use pessimistic txn and fix updating memory state too early #59157

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions pkg/ddl/notifier/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,20 @@ func (n *DDLNotifier) processEventForHandler(
if (change.processedByFlag & (1 << handlerID)) != 0 {
return nil
}
newFlag := change.processedByFlag | (1 << handlerID)

if err = session.Begin(ctx); err != nil {
if err = session.BeginPessimistic(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the target issue of using pessimistic txn ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pessimistic txn, errors can be found earlier, such as when execute the SQL. So handler can see the error and process it. Otherwise the error is reported when COMMIT, and in current handler interface it can't be proccessed

Copy link
Contributor

@D3Hunter D3Hunter Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pessimistic txn, errors can be found earlier

for update, if there is no check about affected rows, there is no error actually in pessimistic txn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original problem is handler uses the txn to insert some duplicate rows. In optimistic txn, there's no error after execute the INSERT. Using pessimistic txn handler can see "duplicate entry" error. #58980 is related

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also checked the affected rows in bef13bf . It's revealed by old UT after switching to pessimistic txn.

return errors.Trace(err)
}
defer func() {
if err == nil {
err = errors.Trace(session.Commit(ctx))
} else {
if err != nil {
session.Rollback()
return
}

err = errors.Trace(session.Commit(ctx))
if err == nil {
change.processedByFlag = newFlag
}
}()

Expand All @@ -293,7 +298,6 @@ func (n *DDLNotifier) processEventForHandler(
zap.Duration("duration", time.Since(now)))
}

newFlag := change.processedByFlag | (1 << handlerID)
if err = n.store.UpdateProcessed(
ctx,
session,
Expand All @@ -303,7 +307,6 @@ func (n *DDLNotifier) processEventForHandler(
); err != nil {
return errors.Trace(err)
}
change.processedByFlag = newFlag

return nil
}
Expand Down
98 changes: 98 additions & 0 deletions pkg/ddl/notifier/testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,101 @@ func TestBeginTwice(t *testing.T) {
require.NoError(t, err)
require.NotContains(t, string(content), "context provider not set")
}

func TestHandlersSeePessimisticTxnError(t *testing.T) {
// 1. One always fails
// 2. One always succeeds
// Make sure events don't get lost after the second handler succeeds.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
ctx := context.Background()
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
4,
func() (pools.Resource, error) {
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
// Always fails
failHandler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
// Mock a duplicate key error
_, err := sctx.GetSQLExecutor().Execute(ctx, "INSERT INTO test."+ddl.NotifierTableName+" VALUES(1, -1, 'some', 0)")
return err
}
// Always succeeds
successHandler := func(context.Context, sessionctx.Context, *notifier.SchemaChangeEvent) error {
return nil
}
n.RegisterHandler(2, successHandler)
n.RegisterHandler(1, failHandler)
n.OnBecomeOwner()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
require.Never(t, func() bool {
changes := make([]*notifier.SchemaChange, 8)
result, closeFn := s.List(ctx, se)
count, err2 := result.Read(changes)
require.NoError(t, err2)
closeFn()
return count == 0
}, time.Second, 50*time.Millisecond)
}

func TestCommitFailed(t *testing.T) {
// Make sure events don't get lost if internal txn commit failed.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
tk.MustExec("CREATE TABLE subscribe_table (id INT PRIMARY KEY, c INT)")
tk.MustExec("INSERT INTO subscribe_table VALUES (1, 1)")

ctx := context.Background()
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
4,
func() (pools.Resource, error) {
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
handler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
// pessimistic + DDL will cause an "infoschema is changed" error at commit time.
_, err := sctx.GetSQLExecutor().Execute(
ctx, "UPDATE test.subscribe_table SET c = c + 1 WHERE id = 1",
)
require.NoError(t, err)

tk.MustExec("TRUNCATE test.subscribe_table")
return nil
}
n.RegisterHandler(notifier.TestHandlerID, handler)
n.OnBecomeOwner()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
require.Never(t, func() bool {
changes := make([]*notifier.SchemaChange, 8)
result, closeFn := s.List(ctx, se)
count, err2 := result.Read(changes)
require.NoError(t, err2)
closeFn()
return count == 0
}, time.Second, 50*time.Millisecond)
n.OnRetireOwner()
}
14 changes: 14 additions & 0 deletions pkg/ddl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
Expand Down Expand Up @@ -49,6 +50,19 @@ func (s *Session) Begin(ctx context.Context) error {
return nil
}

// BeginPessimistic starts a pessimistic transaction.
func (s *Session) BeginPessimistic(ctx context.Context) error {
err := sessiontxn.GetTxnManager(s.Context).EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{
Type: sessiontxn.EnterNewTxnDefault,
TxnMode: ast.Pessimistic,
})
if err != nil {
return err
}
s.GetSessionVars().SetInTxn(true)
return nil
}

// Commit commits the transaction.
func (s *Session) Commit(ctx context.Context) error {
s.StmtCommit(ctx)
Expand Down
50 changes: 50 additions & 0 deletions pkg/ddl/session/session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package session_test
import (
"context"
"testing"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/session"
Expand Down Expand Up @@ -66,3 +67,52 @@ func TestSessionPool(t *testing.T) {
}
require.Equal(t, uint64(0), targetTS)
}

func TestPessimisticTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("insert into t values (1, 1)")

resourcePool := pools.NewResourcePool(func() (pools.Resource, error) {
newTk := testkit.NewTestKit(t, store)
return newTk.Session(), nil
}, 4, 4, 0)
pool := session.NewSessionPool(resourcePool)
ctx := context.Background()

sessCtx, err := pool.Get()
require.NoError(t, err)
se := session.NewSession(sessCtx)
sessCtx2, err := pool.Get()
require.NoError(t, err)
se2 := session.NewSession(sessCtx2)

err = se.BeginPessimistic(ctx)
require.NoError(t, err)
err = se2.BeginPessimistic(ctx)
require.NoError(t, err)
_, err = se.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
require.NoError(t, err)
done := make(chan struct{}, 1)
go func() {
_, err := se2.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
require.NoError(t, err)
done <- struct{}{}
err = se2.Commit(ctx)
require.NoError(t, err)
close(done)
}()

time.Sleep(100 * time.Millisecond)
// because this is a pessimistic transaction, the second transaction should be blocked
require.Len(t, done, 0)
err = se.Commit(ctx)
require.NoError(t, err)
<-done
_, ok := <-done
require.False(t, ok)
pool.Put(sessCtx)
pool.Put(sessCtx2)
}