From 91613a5214f36f0b00839bf648669f0d0b95ccbd Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 24 Jan 2025 17:05:11 +0800 Subject: [PATCH] rewrite --- .../worker/cloudstorage_dml_worker.go | 57 +++++----- .../{defragmenter => writer}/defragmenter.go | 28 +++-- downstreamadapter/worker/writer/worker.go | 81 +++++++++++++ .../writer.go} | 107 ++++-------------- 4 files changed, 150 insertions(+), 123 deletions(-) rename downstreamadapter/worker/{defragmenter => writer}/defragmenter.go (84%) create mode 100644 downstreamadapter/worker/writer/worker.go rename downstreamadapter/worker/{cloudstorage_worker.go => writer/writer.go} (79%) diff --git a/downstreamadapter/worker/cloudstorage_dml_worker.go b/downstreamadapter/worker/cloudstorage_dml_worker.go index f9886c40..75388c5d 100644 --- a/downstreamadapter/worker/cloudstorage_dml_worker.go +++ b/downstreamadapter/worker/cloudstorage_dml_worker.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/ticdc/utils/chann" "go.uber.org/zap" - "github.com/pingcap/ticdc/downstreamadapter/worker/defragmenter" + "github.com/pingcap/ticdc/downstreamadapter/worker/writer" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" @@ -48,18 +48,18 @@ type CloudStorageDMLWorker struct { // last sequence number lastSeqNum uint64 - // encodingWorkers defines a group of workers for encoding events. - encodingWorkers []*CloudStorageEncodingWorker - workers []*CloudStorageWorker + // workers defines a group of workers for encoding events. + workers []*writer.Worker + writers []*writer.Writer // defragmenter is used to defragment the out-of-order encoded messages and // sends encoded messages to individual dmlWorkers. - defragmenter *defragmenter.Defragmenter + defragmenter *writer.Defragmenter alive struct { sync.RWMutex // msgCh is a channel to hold eventFragment. // The caller of WriteEvents will write eventFragment to msgCh and // the encodingWorkers will read eventFragment from msgCh to encode events. - msgCh *chann.DrainableChann[defragmenter.EventFragment] + msgCh *chann.DrainableChann[writer.EventFragment] isDead bool } } @@ -73,28 +73,28 @@ func NewCloudStorageDMLWorker( statistics *metrics.Statistics, ) (*CloudStorageDMLWorker, error) { w := &CloudStorageDMLWorker{ - changefeedID: changefeedID, - storage: storage, - config: config, - statistics: statistics, - encodingWorkers: make([]*CloudStorageEncodingWorker, defaultEncodingConcurrency), - workers: make([]*CloudStorageWorker, config.WorkerCount), + changefeedID: changefeedID, + storage: storage, + config: config, + statistics: statistics, + workers: make([]*writer.Worker, defaultEncodingConcurrency), + writers: make([]*writer.Writer, config.WorkerCount), } - w.alive.msgCh = chann.NewAutoDrainChann[defragmenter.EventFragment]() - encodedOutCh := make(chan defragmenter.EventFragment, defaultChannelSize) - workerChannels := make([]*chann.DrainableChann[defragmenter.EventFragment], config.WorkerCount) + w.alive.msgCh = chann.NewAutoDrainChann[writer.EventFragment]() + encodedOutCh := make(chan writer.EventFragment, defaultChannelSize) + workerChannels := make([]*chann.DrainableChann[writer.EventFragment], config.WorkerCount) // create a group of encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { encoderBuilder, err := codec.NewTxnEventEncoder(encoderConfig) if err != nil { return nil, err } - w.encodingWorkers[i] = NewCloudStorageEncodingWorker(i, w.changefeedID, encoderBuilder, w.alive.msgCh.Out(), encodedOutCh) + w.workers[i] = writer.NewWorker(i, w.changefeedID, encoderBuilder, w.alive.msgCh.Out(), encodedOutCh) } // create a group of dml workers. for i := 0; i < w.config.WorkerCount; i++ { - inputCh := chann.NewAutoDrainChann[defragmenter.EventFragment]() - w.workers[i] = NewCloudStorageWorker(i, w.changefeedID, storage, config, extension, + inputCh := chann.NewAutoDrainChann[writer.EventFragment]() + w.writers[i] = writer.NewWriter(i, w.changefeedID, storage, config, extension, inputCh, w.statistics) workerChannels[i] = inputCh } @@ -102,7 +102,7 @@ func NewCloudStorageDMLWorker( // The defragmenter is used to defragment the out-of-order encoded messages from encoding workers and // sends encoded messages to related dmlWorkers in order. Messages of the same table will be sent to // the same dml - w.defragmenter = defragmenter.NewDefragmenter(encodedOutCh, workerChannels) + w.defragmenter = writer.NewDefragmenter(encodedOutCh, workerChannels) return w, nil } @@ -111,8 +111,8 @@ func NewCloudStorageDMLWorker( func (w *CloudStorageDMLWorker) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - for i := 0; i < len(w.encodingWorkers); i++ { - encodingWorker := w.encodingWorkers[i] + for i := 0; i < len(w.workers); i++ { + encodingWorker := w.workers[i] eg.Go(func() error { return encodingWorker.Run(ctx) }) @@ -122,8 +122,8 @@ func (w *CloudStorageDMLWorker) Run(ctx context.Context) error { return w.defragmenter.Run(ctx) }) - for i := 0; i < len(w.workers); i++ { - worker := w.workers[i] + for i := 0; i < len(w.writers); i++ { + worker := w.writers[i] eg.Go(func() error { return worker.Run(ctx) }) @@ -164,21 +164,18 @@ func (w *CloudStorageDMLWorker) AddDMLEvent(event *commonEvent.DMLEvent) { } seq := atomic.AddUint64(&w.lastSeqNum, 1) + // TODO: add ObserveRows interface // w.statistics.ObserveRows(event.Rows...) // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. - w.alive.msgCh.In() <- defragmenter.EventFragment{ - SeqNumber: seq, - VersionedTable: tbl, - Event: event, - } + w.alive.msgCh.In() <- writer.NewEventFragment(seq, tbl, event) } func (w *CloudStorageDMLWorker) Close() { - for _, encodingWorker := range w.encodingWorkers { + for _, encodingWorker := range w.workers { encodingWorker.Close() } - for _, worker := range w.workers { + for _, worker := range w.writers { worker.Close() } diff --git a/downstreamadapter/worker/defragmenter/defragmenter.go b/downstreamadapter/worker/writer/defragmenter.go similarity index 84% rename from downstreamadapter/worker/defragmenter/defragmenter.go rename to downstreamadapter/worker/writer/defragmenter.go index c49e7405..ede568a8 100644 --- a/downstreamadapter/worker/defragmenter/defragmenter.go +++ b/downstreamadapter/worker/writer/defragmenter.go @@ -10,7 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package defragmenter +package writer import ( "context" @@ -25,17 +25,25 @@ import ( // EventFragment is used to attach a sequence number to TxnCallbackableEvent. type EventFragment struct { - Event *commonEvent.DMLEvent - VersionedTable cloudstorage.VersionedTableName + event *commonEvent.DMLEvent + versionedTable cloudstorage.VersionedTableName // The sequence number is mainly useful for TxnCallbackableEvent defragmentation. // e.g. TxnCallbackableEvent 1~5 are dispatched to a group of encoding workers, but the // encoding completion time varies. Let's say the final completion sequence are 1,3,2,5,4, // we can use the sequence numbers to do defragmentation so that the events can arrive // at dmlWorker sequentially. - SeqNumber uint64 + seqNumber uint64 // encodedMsgs denote the encoded messages after the event is handled in encodingWorker. - EncodedMsgs []*common.Message + encodedMsgs []*common.Message +} + +func NewEventFragment(seq uint64, version cloudstorage.VersionedTableName, event *commonEvent.DMLEvent) EventFragment { + return EventFragment{ + seqNumber: seq, + versionedTable: version, + event: event, + } } // Defragmenter is used to handle event fragments which can be registered @@ -73,10 +81,10 @@ func (d *Defragmenter) Run(ctx context.Context) error { } // check whether to write messages to output channel right now next := d.lastDispatchedSeq + 1 - if frag.SeqNumber == next { + if frag.seqNumber == next { d.writeMsgsConsecutive(ctx, frag) - } else if frag.SeqNumber > next { - d.future[frag.SeqNumber] = frag + } else if frag.seqNumber > next { + d.future[frag.seqNumber] = frag } else { return nil } @@ -108,12 +116,12 @@ func (d *Defragmenter) writeMsgsConsecutive( } func (d *Defragmenter) dispatchFragToDMLWorker(frag EventFragment) { - tableName := frag.VersionedTable.TableNameWithPhysicTableID + tableName := frag.versionedTable.TableNameWithPhysicTableID d.hasher.Reset() d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) workerID := d.hasher.Sum32() % uint32(len(d.outputChs)) d.outputChs[workerID].In() <- frag - d.lastDispatchedSeq = frag.SeqNumber + d.lastDispatchedSeq = frag.seqNumber } func (d *Defragmenter) close() { diff --git a/downstreamadapter/worker/writer/worker.go b/downstreamadapter/worker/writer/worker.go new file mode 100644 index 00000000..9310434a --- /dev/null +++ b/downstreamadapter/worker/writer/worker.go @@ -0,0 +1,81 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package writer + +import ( + "context" + "sync/atomic" + + commonType "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + + "github.com/pingcap/tiflow/pkg/errors" +) + +// Worker denotes the worker responsible for encoding RowChangedEvents +// to messages formatted in the specific protocol. +type Worker struct { + id int + changeFeedID commonType.ChangeFeedID + encoder common.TxnEventEncoder + isClosed uint64 + inputCh <-chan EventFragment + outputCh chan<- EventFragment +} + +func NewWorker( + workerID int, + changefeedID commonType.ChangeFeedID, + encoder common.TxnEventEncoder, + inputCh <-chan EventFragment, + outputCh chan<- EventFragment, +) *Worker { + return &Worker{ + id: workerID, + changeFeedID: changefeedID, + encoder: encoder, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (w *Worker) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case frag, ok := <-w.inputCh: + if !ok || atomic.LoadUint64(&w.isClosed) == 1 { + return nil + } + err := w.encodeEvents(frag) + if err != nil { + return errors.Trace(err) + } + } + } +} + +func (w *Worker) encodeEvents(frag EventFragment) error { + w.encoder.AppendTxnEvent(frag.event) + frag.encodedMsgs = w.encoder.Build() + w.outputCh <- frag + + return nil +} + +func (w *Worker) Close() { + if !atomic.CompareAndSwapUint64(&w.isClosed, 0, 1) { + return + } +} diff --git a/downstreamadapter/worker/cloudstorage_worker.go b/downstreamadapter/worker/writer/writer.go similarity index 79% rename from downstreamadapter/worker/cloudstorage_worker.go rename to downstreamadapter/worker/writer/writer.go index 203b1d57..972743da 100644 --- a/downstreamadapter/worker/cloudstorage_worker.go +++ b/downstreamadapter/worker/writer/writer.go @@ -10,7 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package worker +package writer import ( "bytes" @@ -22,7 +22,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" - "github.com/pingcap/ticdc/downstreamadapter/worker/defragmenter" commonType "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/metrics" @@ -39,8 +38,8 @@ import ( "golang.org/x/sync/errgroup" ) -// CloudStorageWorker denotes a worker responsible for writing messages to cloud storage. -type CloudStorageWorker struct { +// Writer denotes a worker responsible for writing messages to cloud storage. +type Writer struct { // worker id id int changeFeedID commonType.ChangeFeedID @@ -48,7 +47,7 @@ type CloudStorageWorker struct { config *cloudstorage.Config // toBeFlushedCh contains a set of batchedTask waiting to be flushed to cloud storage. toBeFlushedCh chan batchedTask - inputCh *chann.DrainableChann[defragmenter.EventFragment] + inputCh *chann.DrainableChann[EventFragment] isClosed uint64 statistics *metrics.Statistics filePathGenerator *cloudstorage.FilePathGenerator @@ -59,17 +58,17 @@ type CloudStorageWorker struct { metricsWorkerBusyRatio prometheus.Counter } -func NewCloudStorageWorker( +func NewWriter( id int, changefeedID commonType.ChangeFeedID, storage storage.ExternalStorage, config *cloudstorage.Config, extension string, - inputCh *chann.DrainableChann[defragmenter.EventFragment], + inputCh *chann.DrainableChann[EventFragment], statistics *metrics.Statistics, -) *CloudStorageWorker { +) *Writer { pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) - d := &CloudStorageWorker{ + d := &Writer{ id: id, changeFeedID: changefeedID, storage: storage, @@ -94,7 +93,7 @@ func NewCloudStorageWorker( } // Run creates a set of background goroutines. -func (d *CloudStorageWorker) Run(ctx context.Context) error { +func (d *Writer) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { return d.flushMessages(ctx) @@ -109,7 +108,7 @@ func (d *CloudStorageWorker) Run(ctx context.Context) error { // flushMessages flushed messages of active tables to cloud storage. // active tables are those tables that have received events after the last flush. -func (d *CloudStorageWorker) flushMessages(ctx context.Context) error { +func (d *Writer) flushMessages(ctx context.Context) error { var flushTimeSlice time.Duration overseerDuration := d.config.FlushInterval * 2 overseerTicker := time.NewTicker(overseerDuration) @@ -198,14 +197,14 @@ func (d *CloudStorageWorker) flushMessages(ctx context.Context) error { } } -func (d *CloudStorageWorker) writeIndexFile(ctx context.Context, path, content string) error { +func (d *Writer) writeIndexFile(ctx context.Context, path, content string) error { start := time.Now() err := d.storage.WriteFile(ctx, path, []byte(content)) d.metricFlushDuration.Observe(time.Since(start).Seconds()) return err } -func (d *CloudStorageWorker) writeDataFile(ctx context.Context, path string, task *singleTableTask) error { +func (d *Writer) writeDataFile(ctx context.Context, path string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) rowsCnt := 0 @@ -267,8 +266,8 @@ func (d *CloudStorageWorker) writeDataFile(ctx context.Context, path string, tas // genAndDispatchTask dispatches flush tasks in two conditions: // 1. the flush interval exceeds the upper limit. // 2. the file size exceeds the upper limit. -func (d *CloudStorageWorker) genAndDispatchTask(ctx context.Context, - ch *chann.DrainableChann[defragmenter.EventFragment], +func (d *Writer) genAndDispatchTask(ctx context.Context, + ch *chann.DrainableChann[EventFragment], ) error { batchedTask := newBatchedTask() ticker := time.NewTicker(d.config.FlushInterval) @@ -300,12 +299,12 @@ func (d *CloudStorageWorker) genAndDispatchTask(ctx context.Context, if !ok || atomic.LoadUint64(&d.isClosed) == 1 { return nil } - batchedTask.HandleSingleTableEvent(frag) + batchedTask.handleSingleTableEvent(frag) // if the file size exceeds the upper limit, emit the flush task containing the table // as soon as possible. - table := frag.VersionedTable + table := frag.versionedTable if batchedTask.batch[table].size >= uint64(d.config.FileSize) { - task := batchedTask.GenerateTaskByTable(table) + task := batchedTask.generateTaskByTable(table) select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -319,70 +318,12 @@ func (d *CloudStorageWorker) genAndDispatchTask(ctx context.Context, } } -func (d *CloudStorageWorker) Close() { +func (d *Writer) Close() { if !atomic.CompareAndSwapUint64(&d.isClosed, 0, 1) { return } } -// CloudStorageEncodingWorker denotes the worker responsible for encoding RowChangedEvents -// to messages formatted in the specific protocol. -type CloudStorageEncodingWorker struct { - id int - changeFeedID commonType.ChangeFeedID - encoder common.TxnEventEncoder - isClosed uint64 - inputCh <-chan defragmenter.EventFragment - outputCh chan<- defragmenter.EventFragment -} - -func NewCloudStorageEncodingWorker( - workerID int, - changefeedID commonType.ChangeFeedID, - encoder common.TxnEventEncoder, - inputCh <-chan defragmenter.EventFragment, - outputCh chan<- defragmenter.EventFragment, -) *CloudStorageEncodingWorker { - return &CloudStorageEncodingWorker{ - id: workerID, - changeFeedID: changefeedID, - encoder: encoder, - inputCh: inputCh, - outputCh: outputCh, - } -} - -func (w *CloudStorageEncodingWorker) Run(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case frag, ok := <-w.inputCh: - if !ok || atomic.LoadUint64(&w.isClosed) == 1 { - return nil - } - err := w.encodeEvents(frag) - if err != nil { - return errors.Trace(err) - } - } - } -} - -func (w *CloudStorageEncodingWorker) encodeEvents(frag defragmenter.EventFragment) error { - w.encoder.AppendTxnEvent(frag.Event) - frag.EncodedMsgs = w.encoder.Build() - w.outputCh <- frag - - return nil -} - -func (w *CloudStorageEncodingWorker) Close() { - if !atomic.CompareAndSwapUint64(&w.isClosed, 0, 1) { - return - } -} - // batchedTask contains a set of singleTableTask. // We batch message of different tables together to reduce the overhead of calling external storage API. type batchedTask struct { @@ -402,23 +343,23 @@ func newBatchedTask() batchedTask { } } -func (t *batchedTask) HandleSingleTableEvent(event defragmenter.EventFragment) { - table := event.VersionedTable +func (t *batchedTask) handleSingleTableEvent(event EventFragment) { + table := event.versionedTable if _, ok := t.batch[table]; !ok { t.batch[table] = &singleTableTask{ size: 0, - tableInfo: event.Event.TableInfo, + tableInfo: event.event.TableInfo, } } v := t.batch[table] - for _, msg := range event.EncodedMsgs { + for _, msg := range event.encodedMsgs { v.size += uint64(len(msg.Value)) } - v.msgs = append(v.msgs, event.EncodedMsgs...) + v.msgs = append(v.msgs, event.encodedMsgs...) } -func (t *batchedTask) GenerateTaskByTable(table cloudstorage.VersionedTableName) batchedTask { +func (t *batchedTask) generateTaskByTable(table cloudstorage.VersionedTableName) batchedTask { v := t.batch[table] if v == nil { log.Panic("table not found in dml task", zap.Any("table", table), zap.Any("task", t))