Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix integration test capture_suicide_while_balance_table #952

Merged
merged 11 commits into from
Jan 24, 2025
Merged
18 changes: 16 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,31 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=savepoint

# The 14th case in this group
- name: Test server config compatibility
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=server_config_compatibility

# The 15th case in this group
- name: Test split region
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=split_region

# The 16th case in this group
- name: Test changefeed resume with checkpoint ts
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_resume_with_checkpoint_ts

- name: Test capture suicide while balance table
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=capture_suicide_while_balance_table

- name: Test kv client stream reconnect
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=kv_client_stream_reconnect

- name: Upload test logs
if: always()
Expand Down
24 changes: 24 additions & 0 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -173,6 +174,17 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred
return s.receiveAndDispatchChangeEvents(conn)
})
g.Go(func() error { return s.processRegionSendTask(gctx, conn) })

failpoint.Inject("InjectForceReconnect", func() {
timer := time.After(10 * time.Second)
g.Go(func() error {
<-timer
err := errors.New("inject force reconnect")
log.Info("inject force reconnect", zap.Error(err))
return err
})
})

_ = g.Wait()
return isCanceled()
}
Expand Down Expand Up @@ -427,3 +439,15 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
}
return regions
}

func (s *regionRequestWorker) getAllRegionStates() regionFeedStates {
s.requestedRegions.RLock()
defer s.requestedRegions.RUnlock()
states := make(regionFeedStates)
for _, statesMap := range s.requestedRegions.subscriptions {
for regionID, state := range statesMap {
states[regionID] = state
}
}
return states
}
2 changes: 1 addition & 1 deletion logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
kvclientv2 "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down
2 changes: 1 addition & 1 deletion maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c *Controller) addNewSpans(schemaID int64, tableSpans []*heartbeatpb.Table
}

func (c *Controller) loadTables(startTs uint64) ([]commonEvent.Table, error) {
// todo: do we need to set timezone here?
// Use a empty timezone because table filter does not need it.
f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.CaseSensitive, c.cfConfig.ForceReplicate)
if err != nil {
return nil, errors.Cause(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sink/mysql/mysql_writer_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/pkg/parser/mysql"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"go.uber.org/zap"
Expand Down Expand Up @@ -170,6 +171,8 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error {
failpoint.Return(err)
})

failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(w.ctx, time.Hour) })

failpoint.Inject("MySQLDuplicateEntryError", func() {
log.Warn("inject MySQLDuplicateEntryError")
err := cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{
Expand Down
58 changes: 58 additions & 0 deletions pkg/util/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

type numbers interface {
int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | uintptr | float32 | float64
}

type genericAtomic[T numbers] interface {
Load() T
Store(T)
CompareAndSwap(old, new T) bool
}

// CompareAndIncrease updates the target if the new value is larger than or equal to the old value.
// It returns false if the new value is smaller than the old value.
func CompareAndIncrease[T numbers](target genericAtomic[T], new T) bool {
for {
old := target.Load()
if new < old {
return false
}
if new == old || target.CompareAndSwap(old, new) {
return true
}
}
}

// CompareAndMonotonicIncrease updates the target if the new value is larger than the old value.
// It returns false if the new value is smaller than or equal to the old value.
func CompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) bool {
for {
old := target.Load()
if new <= old {
return false
}
if target.CompareAndSwap(old, new) {
return true
}
}
}

// MustCompareAndMonotonicIncrease updates the target if the new value is larger than the old value. It do nothing
// if the new value is smaller than or equal to the old value.
func MustCompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) {
_ = CompareAndMonotonicIncrease(target, new)
}
106 changes: 106 additions & 0 deletions pkg/util/atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"math/rand"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
)

func TestMustCompareAndIncrease(t *testing.T) {
t.Parallel()

var target atomic.Int64
target.Store(10)

ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}

doIncrease := func() {
for {
select {
case <-ctx.Done():
return
default:
delta := rand.Int63n(100)
v := target.Load() + delta
MustCompareAndMonotonicIncrease(&target, v)
require.GreaterOrEqual(t, target.Load(), v)
}
}
}

// Test target increase.
wg.Add(2)
go func() {
defer wg.Done()
doIncrease()
}()
go func() {
defer wg.Done()
doIncrease()
}()

wg.Add(1)
// Test target never decrease.
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
v := target.Load() - 1
MustCompareAndMonotonicIncrease(&target, v)
require.Greater(t, target.Load(), v)
}
}
}()

cancel()
wg.Wait()
}

func TestCompareAndIncrease(t *testing.T) {
t.Parallel()

var target atomic.Int64
target.Store(10)
require.True(t, CompareAndIncrease(&target, 10))
require.Equal(t, int64(10), target.Load())

require.True(t, CompareAndIncrease(&target, 20))
require.Equal(t, int64(20), target.Load())
require.False(t, CompareAndIncrease(&target, 19))
require.Equal(t, int64(20), target.Load())
}

func TestCompareAndMonotonicIncrease(t *testing.T) {
t.Parallel()

var target atomic.Int64
target.Store(10)
require.False(t, CompareAndMonotonicIncrease(&target, 10))
require.Equal(t, int64(10), target.Load())

require.True(t, CompareAndMonotonicIncrease(&target, 11))
require.Equal(t, int64(11), target.Load())
require.False(t, CompareAndMonotonicIncrease(&target, 10))
require.Equal(t, int64(11), target.Load())
}
16 changes: 16 additions & 0 deletions pkg/util/net_util.go → pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package util

import (
"context"
"net"
"strconv"
"time"

"github.com/pingcap/errors"
)
Expand All @@ -33,3 +35,17 @@ func ParseHostAndPortFromAddress(address string) (string, uint, error) {
}
return host, uint(portNumeric), nil
}

// Hang will block the goroutine for a given duration, or return when `ctx` is done.
func Hang(ctx context.Context, dur time.Duration) error {
timer := time.NewTimer(dur)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return ctx.Err()
case <-timer.C:
return nil
}
}
Loading
Loading