Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl notifier: use pessimistic txn and fix updating memory state too early #59157

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ go_test(
],
embed = [":notifier"],
flaky = True,
shard_count = 10,
shard_count = 12,
deps = [
"//pkg/ddl",
"//pkg/ddl/session",
Expand Down
24 changes: 18 additions & 6 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type Store interface {
se *sess.Session,
ddlJobID int64,
multiSchemaChangeID int64,
processedBy uint64,
oldProcessedBy uint64,
newProcessedBy uint64,
) error
DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
// List will start a transaction of given session and read all schema changes
Expand Down Expand Up @@ -87,17 +88,28 @@ func (t *tableStore) UpdateProcessed(
se *sess.Session,
ddlJobID int64,
multiSchemaChangeID int64,
processedBy uint64,
oldProcessedBy uint64,
newProcessedBy uint64,
) error {
sql := fmt.Sprintf(`
UPDATE %s.%s
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
SET processed_by_flag = %d
WHERE ddl_job_id = %d AND sub_job_id = %d`,
WHERE ddl_job_id = %d AND sub_job_id = %d AND processed_by_flag = %d`,
t.db, t.table,
processedBy,
ddlJobID, multiSchemaChangeID)
newProcessedBy,
ddlJobID, multiSchemaChangeID, oldProcessedBy,
)
_, err := se.Execute(ctx, sql, "ddl_notifier")
return err
if err != nil {
return errors.Trace(err)
}
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
return errors.Errorf(
"failed to update processed_by_flag, maybe the row has been updated by other owner. ddl_job_id: %d, sub_job_id: %d",
ddlJobID, multiSchemaChangeID,
)
}
return nil
}

// DeleteAndCommit implements Store interface.
Expand Down
24 changes: 12 additions & 12 deletions pkg/ddl/notifier/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,20 @@ func (n *DDLNotifier) processEventForHandler(
if (change.processedByFlag & (1 << handlerID)) != 0 {
return nil
}
newFlag := change.processedByFlag | (1 << handlerID)

if err = session.Begin(ctx); err != nil {
if err = session.BeginPessimistic(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the target issue of using pessimistic txn ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pessimistic txn, errors can be found earlier, such as when execute the SQL. So handler can see the error and process it. Otherwise the error is reported when COMMIT, and in current handler interface it can't be proccessed

Copy link
Contributor

@D3Hunter D3Hunter Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pessimistic txn, errors can be found earlier

for update, if there is no check about affected rows, there is no error actually in pessimistic txn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original problem is handler uses the txn to insert some duplicate rows. In optimistic txn, there's no error after execute the INSERT. Using pessimistic txn handler can see "duplicate entry" error. #58980 is related

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also checked the affected rows in bef13bf . It's revealed by old UT after switching to pessimistic txn.

return errors.Trace(err)
}
defer func() {
if err == nil {
err = errors.Trace(session.Commit(ctx))
} else {
if err != nil {
session.Rollback()
return
}

err = errors.Trace(session.Commit(ctx))
if err == nil {
change.processedByFlag = newFlag
}
}()

Expand All @@ -293,19 +298,14 @@ func (n *DDLNotifier) processEventForHandler(
zap.Duration("duration", time.Since(now)))
}

newFlag := change.processedByFlag | (1 << handlerID)
if err = n.store.UpdateProcessed(
return errors.Trace(n.store.UpdateProcessed(
ctx,
session,
change.ddlJobID,
change.subJobID,
change.processedByFlag,
newFlag,
); err != nil {
return errors.Trace(err)
}
change.processedByFlag = newFlag

return nil
))
}

// Stop stops the background loop.
Expand Down
107 changes: 104 additions & 3 deletions pkg/ddl/notifier/testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ func Test2OwnerForAShortTime(t *testing.T) {

s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
1,
4,
func() (pools.Resource, error) {
return tk.Session(), nil
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
Expand Down Expand Up @@ -360,7 +360,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
if !bytes.Contains(content, []byte("Error processing change")) {
return false
}
return bytes.Contains(content, []byte("Write conflict"))
return bytes.Contains(content, []byte("maybe the row has been updated by other owner"))
}, time.Second, 25*time.Millisecond)
// the handler should not commit
tk2.MustQuery("SELECT * FROM test.result").Check(testkit.Rows())
Expand Down Expand Up @@ -489,3 +489,104 @@ func TestBeginTwice(t *testing.T) {
require.NoError(t, err)
require.NotContains(t, string(content), "context provider not set")
}

func TestHandlersSeePessimisticTxnError(t *testing.T) {
// 1. One always fails
// 2. One always succeeds
// Make sure events don't get lost after the second handler succeeds.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
ctx := context.Background()
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
4,
func() (pools.Resource, error) {
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
// Always fails
failHandler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
// Mock a duplicate key error
_, err := sctx.GetSQLExecutor().Execute(ctx, "INSERT INTO test."+ddl.NotifierTableName+" VALUES(1, -1, 'some', 0)")
return err
}
// Always succeeds
successHandler := func(context.Context, sessionctx.Context, *notifier.SchemaChangeEvent) error {
return nil
}
n.RegisterHandler(2, successHandler)
n.RegisterHandler(1, failHandler)
n.OnBecomeOwner()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
require.Never(t, func() bool {
changes := make([]*notifier.SchemaChange, 8)
result, closeFn := s.List(ctx, se)
count, err2 := result.Read(changes)
require.NoError(t, err2)
closeFn()
return count == 0
}, time.Second, 50*time.Millisecond)
}

func TestCommitFailed(t *testing.T) {
// Make sure events don't get lost if internal txn commit failed.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("set global tidb_enable_metadata_lock=0")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_metadata_lock=1")
})
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
tk.MustExec("CREATE TABLE subscribe_table (id INT PRIMARY KEY, c INT)")
tk.MustExec("INSERT INTO subscribe_table VALUES (1, 1)")

ctx := context.Background()
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
4,
func() (pools.Resource, error) {
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
handler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
// pessimistic + DDL will cause an "infoschema is changed" error at commit time.
_, err := sctx.GetSQLExecutor().Execute(
ctx, "UPDATE test.subscribe_table SET c = c + 1 WHERE id = 1",
)
require.NoError(t, err)

tk.MustExec("TRUNCATE test.subscribe_table")
return nil
}
n.RegisterHandler(notifier.TestHandlerID, handler)
n.OnBecomeOwner()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
require.Never(t, func() bool {
changes := make([]*notifier.SchemaChange, 8)
result, closeFn := s.List(ctx, se)
count, err2 := result.Read(changes)
require.NoError(t, err2)
closeFn()
return count == 0
}, time.Second, 50*time.Millisecond)
n.OnRetireOwner()
}
1 change: 1 addition & 0 deletions pkg/ddl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/sessionctx",
Expand Down
14 changes: 14 additions & 0 deletions pkg/ddl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
Expand Down Expand Up @@ -49,6 +50,19 @@ func (s *Session) Begin(ctx context.Context) error {
return nil
}

// BeginPessimistic starts a pessimistic transaction.
func (s *Session) BeginPessimistic(ctx context.Context) error {
err := sessiontxn.GetTxnManager(s.Context).EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{
Type: sessiontxn.EnterNewTxnDefault,
TxnMode: ast.Pessimistic,
})
if err != nil {
return err
}
s.GetSessionVars().SetInTxn(true)
return nil
}

// Commit commits the transaction.
func (s *Session) Commit(ctx context.Context) error {
s.StmtCommit(ctx)
Expand Down
50 changes: 50 additions & 0 deletions pkg/ddl/session/session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package session_test
import (
"context"
"testing"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/session"
Expand Down Expand Up @@ -66,3 +67,52 @@ func TestSessionPool(t *testing.T) {
}
require.Equal(t, uint64(0), targetTS)
}

func TestPessimisticTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("insert into t values (1, 1)")

resourcePool := pools.NewResourcePool(func() (pools.Resource, error) {
newTk := testkit.NewTestKit(t, store)
return newTk.Session(), nil
}, 4, 4, 0)
pool := session.NewSessionPool(resourcePool)
ctx := context.Background()

sessCtx, err := pool.Get()
require.NoError(t, err)
se := session.NewSession(sessCtx)
sessCtx2, err := pool.Get()
require.NoError(t, err)
se2 := session.NewSession(sessCtx2)

err = se.BeginPessimistic(ctx)
require.NoError(t, err)
err = se2.BeginPessimistic(ctx)
require.NoError(t, err)
_, err = se.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
require.NoError(t, err)
done := make(chan struct{}, 1)
go func() {
_, err := se2.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
require.NoError(t, err)
done <- struct{}{}
err = se2.Commit(ctx)
require.NoError(t, err)
close(done)
}()

time.Sleep(100 * time.Millisecond)
// because this is a pessimistic transaction, the second transaction should be blocked
require.Len(t, done, 0)
err = se.Commit(ctx)
require.NoError(t, err)
<-done
_, ok := <-done
require.False(t, ok)
pool.Put(sessCtx)
pool.Put(sessCtx2)
}