Skip to content

Commit

Permalink
ddl: reuse backend for DXF subtasks of same step (#59165)
Browse files Browse the repository at this point in the history
ref #57229, ref #57497
  • Loading branch information
D3Hunter authored Jan 24, 2025
1 parent 9f07f52 commit c7ce790
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 91 deletions.
16 changes: 15 additions & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/expression/exprctx"
"github.com/pingcap/tidb/pkg/expression/exprstatic"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -735,9 +737,21 @@ func (dc *ddlCtx) addIndexWithLocalIngest(
hasUnique = hasUnique || indexInfo.Unique
}

var (
cfg *local.BackendConfig
bd *local.Backend
err error
)
if config.GetGlobalConfig().Store == config.StoreTypeTiKV {
cfg, bd, err = ingest.CreateLocalBackend(ctx, dc.store, job, false)
if err != nil {
return errors.Trace(err)
}
defer bd.Close()
}
bcCtx, err := ingest.NewBackendCtxBuilder(ctx, dc.store, job).
WithCheckpointManagerParam(sessPool, reorgInfo.PhysicalTableID).
Build()
Build(cfg, bd)
if err != nil {
return errors.Trace(err)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/meta/model"
Expand All @@ -40,6 +41,7 @@ type cloudImportExecutor struct {
ptbl table.PhysicalTable
cloudStoreURI string
backendCtx ingest.BackendCtx
backend *local.Backend
}

func newCloudImportExecutor(
Expand All @@ -60,10 +62,16 @@ func newCloudImportExecutor(

func (m *cloudImportExecutor) Init(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor init subtask exec env")
bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build()
cfg, bd, err := ingest.CreateLocalBackend(ctx, m.store, m.job, false)
if err != nil {
return errors.Trace(err)
}
bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build(cfg, bd)
if err != nil {
bd.Close()
return err
}
m.backend = bd
m.backendCtx = bCtx
return nil
}
Expand Down Expand Up @@ -159,5 +167,6 @@ func (m *cloudImportExecutor) Cleanup(ctx context.Context) error {
if m.backendCtx != nil {
m.backendCtx.Close()
}
m.backend.Close()
return nil
}
22 changes: 19 additions & 3 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/table"
Expand All @@ -54,6 +56,8 @@ type readIndexExecutor struct {
curRowCount *atomic.Int64

subtaskSummary sync.Map // subtaskID => readIndexSummary
backendCfg *local.BackendConfig
backend *local.Backend
}

type readIndexSummary struct {
Expand Down Expand Up @@ -82,8 +86,17 @@ func newReadIndexExecutor(
}, nil
}

func (*readIndexExecutor) Init(_ context.Context) error {
func (r *readIndexExecutor) Init(ctx context.Context) error {
logutil.DDLLogger().Info("read index executor init subtask exec env")
cfg := config.GetGlobalConfig()
if cfg.Store == config.StoreTypeTiKV {
cfg, bd, err := ingest.CreateLocalBackend(ctx, r.d.store, r.job, false)
if err != nil {
return errors.Trace(err)
}
r.backendCfg = cfg
r.backend = bd
}
return nil
}

Expand Down Expand Up @@ -119,7 +132,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
// TODO(tangenta): support checkpoint manager that interact with subtask table.
bCtx, err := ingest.NewBackendCtxBuilder(ctx, r.d.store, r.job).
WithImportDistributedLock(r.d.etcdCli, sm.TS).
Build()
Build(r.backendCfg, r.backend)
if err != nil {
return err
}
Expand Down Expand Up @@ -151,8 +164,11 @@ func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
}
}

func (*readIndexExecutor) Cleanup(ctx context.Context) error {
func (r *readIndexExecutor) Cleanup(ctx context.Context) error {
tidblogutil.Logger(ctx).Info("read index executor cleanup subtask exec env")
if r.backend != nil {
r.backend.Close()
}
return nil
}

Expand Down
31 changes: 24 additions & 7 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
litconfig "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/metabuild"
Expand Down Expand Up @@ -2440,25 +2441,41 @@ func (w *worker) addTableIndex(
}

func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, store kv.Storage) (err error) {
var bc ingest.BackendCtx
var (
backendCtx ingest.BackendCtx
cfg *local.BackendConfig
backend *local.Backend
)
defer func() {
if backendCtx != nil {
backendCtx.Close()
}
if backend != nil {
backend.Close()
}
}()
for _, elem := range reorgInfo.elements {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, elem.ID)
if indexInfo == nil {
return errors.New("unexpected error, can't find index info")
}
if indexInfo.Unique {
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.NewBackendCtxBuilder(ctx, store, reorgInfo.Job).
if backendCtx == nil {
if config.GetGlobalConfig().Store == config.StoreTypeTiKV {
cfg, backend, err = ingest.CreateLocalBackend(ctx, store, reorgInfo.Job, true)
if err != nil {
return errors.Trace(err)
}
}
backendCtx, err = ingest.NewBackendCtxBuilder(ctx, store, reorgInfo.Job).
ForDuplicateCheck().
Build()
Build(cfg, backend)
if err != nil {
return err
}
//nolint:revive,all_revive
defer bc.Close()
}
err = bc.CollectRemoteDuplicateRows(indexInfo.ID, t)
err = backendCtx.CollectRemoteDuplicateRows(indexInfo.ID, t)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//servicediscovery",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_exp//maps",
"@org_uber_go_atomic//:atomic",
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type litBackendCtx struct {
memRoot MemRoot
jobID int64
tbl table.Table
// litBackendCtx doesn't manage the lifecycle of backend, caller should do it.
backend *local.Backend
ctx context.Context
cfg *local.BackendConfig
Expand Down Expand Up @@ -319,14 +320,14 @@ func (bc *litBackendCtx) unsafeImportAndReset(ctx context.Context, ei *engineInf
}

// ForceSyncFlagForTest is a flag to force sync only for test.
var ForceSyncFlagForTest = false
var ForceSyncFlagForTest atomic.Bool

func (bc *litBackendCtx) checkFlush() (shouldFlush bool, shouldImport bool) {
failpoint.Inject("forceSyncFlagForTest", func() {
// used in a manual test
ForceSyncFlagForTest = true
ForceSyncFlagForTest.Store(true)
})
if ForceSyncFlagForTest {
if ForceSyncFlagForTest.Load() {
return true, true
}
LitDiskRoot.UpdateUsage()
Expand Down Expand Up @@ -359,7 +360,6 @@ func (bc *litBackendCtx) Close() {
logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("jobID", bc.jobID),
zap.Int64("current memory usage", LitMemRoot.CurrentUsage()),
zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()))
bc.backend.Close()
LitDiskRoot.Remove(bc.jobID)
BackendCounterForTest.Dec()
}
Expand Down
94 changes: 49 additions & 45 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
sd "github.com/tikv/pd/client/servicediscovery"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -94,29 +93,16 @@ func (b *BackendCtxBuilder) ForDuplicateCheck() *BackendCtxBuilder {
var BackendCounterForTest = atomic.Int64{}

// Build builds a BackendCtx.
func (b *BackendCtxBuilder) Build() (BackendCtx, error) {
func (b *BackendCtxBuilder) Build(cfg *local.BackendConfig, bd *local.Backend) (BackendCtx, error) {
ctx, store, job := b.ctx, b.store, b.job
sortPath, err := GenIngestTempDataDir()
jobSortPath, err := genJobSortPath(job.ID, b.checkDup)
if err != nil {
return nil, err
}
jobSortPath := filepath.Join(sortPath, encodeBackendTag(job.ID, b.checkDup))
intest.Assert(job.Type == model.ActionAddPrimaryKey ||
job.Type == model.ActionAddIndex)
intest.Assert(job.ReorgMeta != nil)

resGroupName := job.ReorgMeta.ResourceGroupName
concurrency := job.ReorgMeta.GetConcurrency()
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
hasUnique, err := hasUniqueIndex(job)
if err != nil {
return nil, err
}
cfg, err := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, concurrency, maxWriteSpeed)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", job.ID), zap.Error(err))
return nil, err
}
failpoint.Inject("beforeCreateLocalBackend", func() {
ResignOwnerForTest.Store(true)
})
Expand All @@ -141,45 +127,41 @@ func (b *BackendCtxBuilder) Build() (BackendCtx, error) {
return mockBackend, nil
}

discovery := pdCli.GetServiceDiscovery()
bd, err := createLocalBackend(ctx, cfg, discovery)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", job.ID), zap.Error(err))
return nil, err
}

bCtx := newBackendContext(ctx, job.ID, bd, cfg,
defaultImportantVariables, LitMemRoot, b.etcdClient, job.RealStartTS, b.importTS, cpMgr)

logutil.Logger(ctx).Info(LitInfoCreateBackend, zap.Int64("job ID", job.ID),
zap.Int64("current memory usage", LitMemRoot.CurrentUsage()),
zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()),
zap.Bool("has unique index", hasUnique))

LitDiskRoot.Add(job.ID, bCtx)
BackendCounterForTest.Add(1)
return bCtx, nil
}

func hasUniqueIndex(job *model.Job) (bool, error) {
args, err := model.GetModifyIndexArgs(job)
func genJobSortPath(jobID int64, checkDup bool) (string, error) {
sortPath, err := GenIngestTempDataDir()
if err != nil {
return false, errors.Trace(err)
return "", err
}
return filepath.Join(sortPath, encodeBackendTag(jobID, checkDup)), nil
}

for _, a := range args.IndexArgs {
if a.Unique {
return true, nil
}
// CreateLocalBackend creates a local backend for adding index.
func CreateLocalBackend(ctx context.Context, store kv.Storage, job *model.Job, checkDup bool) (*local.BackendConfig, *local.Backend, error) {
jobSortPath, err := genJobSortPath(job.ID, checkDup)
if err != nil {
return nil, nil, err
}
return false, nil
}
intest.Assert(job.Type == model.ActionAddPrimaryKey ||
job.Type == model.ActionAddIndex)
intest.Assert(job.ReorgMeta != nil)

resGroupName := job.ReorgMeta.ResourceGroupName
concurrency := job.ReorgMeta.GetConcurrency()
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
hasUnique, err := hasUniqueIndex(job)
if err != nil {
return nil, nil, err
}
cfg := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, concurrency, maxWriteSpeed)

func createLocalBackend(
ctx context.Context,
cfg *local.BackendConfig,
pdSvcDiscovery sd.ServiceDiscovery,
) (*local.Backend, error) {
tidbCfg := config.GetGlobalConfig()
tls, err := common.NewTLS(
tidbCfg.Security.ClusterSSLCA,
Expand All @@ -190,13 +172,35 @@ func createLocalBackend(
)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err))
return nil, err
return nil, nil, err
}

ddllogutil.DDLIngestLogger().Info("create local backend for adding index",
zap.String("sortDir", cfg.LocalStoreDir),
zap.String("keyspaceName", cfg.KeyspaceName))
return local.NewBackend(ctx, tls, *cfg, pdSvcDiscovery)
zap.String("keyspaceName", cfg.KeyspaceName),
zap.Int64("job ID", job.ID),
zap.Int64("current memory usage", LitMemRoot.CurrentUsage()),
zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()),
zap.Bool("has unique index", hasUnique))

//nolint: forcetypeassert
pdCli := store.(tikv.Storage).GetRegionCache().PDClient()
be, err := local.NewBackend(ctx, tls, *cfg, pdCli.GetServiceDiscovery())
return cfg, be, err
}

func hasUniqueIndex(job *model.Job) (bool, error) {
args, err := model.GetModifyIndexArgs(job)
if err != nil {
return false, errors.Trace(err)
}

for _, a := range args.IndexArgs {
if a.Unique {
return true, nil
}
}
return false, nil
}

const checkpointUpdateInterval = 10 * time.Minute
Expand Down
Loading

0 comments on commit c7ce790

Please sign in to comment.