Skip to content

Commit

Permalink
rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 committed Jan 24, 2025
1 parent 6a6facd commit 91613a5
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 123 deletions.
57 changes: 27 additions & 30 deletions downstreamadapter/worker/cloudstorage_dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pingcap/ticdc/utils/chann"
"go.uber.org/zap"

"github.com/pingcap/ticdc/downstreamadapter/worker/defragmenter"
"github.com/pingcap/ticdc/downstreamadapter/worker/writer"
commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/errors"
Expand All @@ -48,18 +48,18 @@ type CloudStorageDMLWorker struct {

// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
encodingWorkers []*CloudStorageEncodingWorker
workers []*CloudStorageWorker
// workers defines a group of workers for encoding events.
workers []*writer.Worker
writers []*writer.Writer
// defragmenter is used to defragment the out-of-order encoded messages and
// sends encoded messages to individual dmlWorkers.
defragmenter *defragmenter.Defragmenter
defragmenter *writer.Defragmenter
alive struct {
sync.RWMutex
// msgCh is a channel to hold eventFragment.
// The caller of WriteEvents will write eventFragment to msgCh and
// the encodingWorkers will read eventFragment from msgCh to encode events.
msgCh *chann.DrainableChann[defragmenter.EventFragment]
msgCh *chann.DrainableChann[writer.EventFragment]
isDead bool
}
}
Expand All @@ -73,36 +73,36 @@ func NewCloudStorageDMLWorker(
statistics *metrics.Statistics,
) (*CloudStorageDMLWorker, error) {
w := &CloudStorageDMLWorker{
changefeedID: changefeedID,
storage: storage,
config: config,
statistics: statistics,
encodingWorkers: make([]*CloudStorageEncodingWorker, defaultEncodingConcurrency),
workers: make([]*CloudStorageWorker, config.WorkerCount),
changefeedID: changefeedID,
storage: storage,
config: config,
statistics: statistics,
workers: make([]*writer.Worker, defaultEncodingConcurrency),
writers: make([]*writer.Writer, config.WorkerCount),
}
w.alive.msgCh = chann.NewAutoDrainChann[defragmenter.EventFragment]()
encodedOutCh := make(chan defragmenter.EventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[defragmenter.EventFragment], config.WorkerCount)
w.alive.msgCh = chann.NewAutoDrainChann[writer.EventFragment]()
encodedOutCh := make(chan writer.EventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[writer.EventFragment], config.WorkerCount)
// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoderBuilder, err := codec.NewTxnEventEncoder(encoderConfig)
if err != nil {
return nil, err
}
w.encodingWorkers[i] = NewCloudStorageEncodingWorker(i, w.changefeedID, encoderBuilder, w.alive.msgCh.Out(), encodedOutCh)
w.workers[i] = writer.NewWorker(i, w.changefeedID, encoderBuilder, w.alive.msgCh.Out(), encodedOutCh)
}
// create a group of dml workers.
for i := 0; i < w.config.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[defragmenter.EventFragment]()
w.workers[i] = NewCloudStorageWorker(i, w.changefeedID, storage, config, extension,
inputCh := chann.NewAutoDrainChann[writer.EventFragment]()
w.writers[i] = writer.NewWriter(i, w.changefeedID, storage, config, extension,
inputCh, w.statistics)
workerChannels[i] = inputCh
}
// create defragmenter.
// The defragmenter is used to defragment the out-of-order encoded messages from encoding workers and
// sends encoded messages to related dmlWorkers in order. Messages of the same table will be sent to
// the same dml
w.defragmenter = defragmenter.NewDefragmenter(encodedOutCh, workerChannels)
w.defragmenter = writer.NewDefragmenter(encodedOutCh, workerChannels)

return w, nil
}
Expand All @@ -111,8 +111,8 @@ func NewCloudStorageDMLWorker(
func (w *CloudStorageDMLWorker) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

for i := 0; i < len(w.encodingWorkers); i++ {
encodingWorker := w.encodingWorkers[i]
for i := 0; i < len(w.workers); i++ {
encodingWorker := w.workers[i]
eg.Go(func() error {
return encodingWorker.Run(ctx)
})
Expand All @@ -122,8 +122,8 @@ func (w *CloudStorageDMLWorker) Run(ctx context.Context) error {
return w.defragmenter.Run(ctx)
})

for i := 0; i < len(w.workers); i++ {
worker := w.workers[i]
for i := 0; i < len(w.writers); i++ {
worker := w.writers[i]
eg.Go(func() error {
return worker.Run(ctx)
})
Expand Down Expand Up @@ -164,21 +164,18 @@ func (w *CloudStorageDMLWorker) AddDMLEvent(event *commonEvent.DMLEvent) {
}
seq := atomic.AddUint64(&w.lastSeqNum, 1)

// TODO: add ObserveRows interface
// w.statistics.ObserveRows(event.Rows...)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
w.alive.msgCh.In() <- defragmenter.EventFragment{
SeqNumber: seq,
VersionedTable: tbl,
Event: event,
}
w.alive.msgCh.In() <- writer.NewEventFragment(seq, tbl, event)
}

func (w *CloudStorageDMLWorker) Close() {
for _, encodingWorker := range w.encodingWorkers {
for _, encodingWorker := range w.workers {
encodingWorker.Close()
}

for _, worker := range w.workers {
for _, worker := range w.writers {
worker.Close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package defragmenter
package writer

import (
"context"
Expand All @@ -25,17 +25,25 @@ import (

// EventFragment is used to attach a sequence number to TxnCallbackableEvent.
type EventFragment struct {
Event *commonEvent.DMLEvent
VersionedTable cloudstorage.VersionedTableName
event *commonEvent.DMLEvent
versionedTable cloudstorage.VersionedTableName

// The sequence number is mainly useful for TxnCallbackableEvent defragmentation.
// e.g. TxnCallbackableEvent 1~5 are dispatched to a group of encoding workers, but the
// encoding completion time varies. Let's say the final completion sequence are 1,3,2,5,4,
// we can use the sequence numbers to do defragmentation so that the events can arrive
// at dmlWorker sequentially.
SeqNumber uint64
seqNumber uint64
// encodedMsgs denote the encoded messages after the event is handled in encodingWorker.
EncodedMsgs []*common.Message
encodedMsgs []*common.Message
}

func NewEventFragment(seq uint64, version cloudstorage.VersionedTableName, event *commonEvent.DMLEvent) EventFragment {
return EventFragment{
seqNumber: seq,
versionedTable: version,
event: event,
}
}

// Defragmenter is used to handle event fragments which can be registered
Expand Down Expand Up @@ -73,10 +81,10 @@ func (d *Defragmenter) Run(ctx context.Context) error {
}
// check whether to write messages to output channel right now
next := d.lastDispatchedSeq + 1
if frag.SeqNumber == next {
if frag.seqNumber == next {
d.writeMsgsConsecutive(ctx, frag)
} else if frag.SeqNumber > next {
d.future[frag.SeqNumber] = frag
} else if frag.seqNumber > next {
d.future[frag.seqNumber] = frag
} else {
return nil
}
Expand Down Expand Up @@ -108,12 +116,12 @@ func (d *Defragmenter) writeMsgsConsecutive(
}

func (d *Defragmenter) dispatchFragToDMLWorker(frag EventFragment) {
tableName := frag.VersionedTable.TableNameWithPhysicTableID
tableName := frag.versionedTable.TableNameWithPhysicTableID
d.hasher.Reset()
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(len(d.outputChs))
d.outputChs[workerID].In() <- frag
d.lastDispatchedSeq = frag.SeqNumber
d.lastDispatchedSeq = frag.seqNumber
}

func (d *Defragmenter) close() {
Expand Down
81 changes: 81 additions & 0 deletions downstreamadapter/worker/writer/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package writer

import (
"context"
"sync/atomic"

commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/sink/codec/common"

"github.com/pingcap/tiflow/pkg/errors"
)

// Worker denotes the worker responsible for encoding RowChangedEvents
// to messages formatted in the specific protocol.
type Worker struct {
id int
changeFeedID commonType.ChangeFeedID
encoder common.TxnEventEncoder
isClosed uint64
inputCh <-chan EventFragment
outputCh chan<- EventFragment
}

func NewWorker(
workerID int,
changefeedID commonType.ChangeFeedID,
encoder common.TxnEventEncoder,
inputCh <-chan EventFragment,
outputCh chan<- EventFragment,
) *Worker {
return &Worker{
id: workerID,
changeFeedID: changefeedID,
encoder: encoder,
inputCh: inputCh,
outputCh: outputCh,
}
}

func (w *Worker) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case frag, ok := <-w.inputCh:
if !ok || atomic.LoadUint64(&w.isClosed) == 1 {
return nil
}
err := w.encodeEvents(frag)
if err != nil {
return errors.Trace(err)
}
}
}
}

func (w *Worker) encodeEvents(frag EventFragment) error {
w.encoder.AppendTxnEvent(frag.event)
frag.encodedMsgs = w.encoder.Build()
w.outputCh <- frag

return nil
}

func (w *Worker) Close() {
if !atomic.CompareAndSwapUint64(&w.isClosed, 0, 1) {
return
}
}
Loading

0 comments on commit 91613a5

Please sign in to comment.