Skip to content

Commit

Permalink
chore(tests): add basic test case for the full+partial sync (#2522)
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Sep 6, 2024
1 parent c75ad64 commit 2836bf0
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 15 deletions.
4 changes: 2 additions & 2 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func TestClusterMultiple(t *testing.T) {
// request replicas a write command, it's wrong
require.ErrorContains(t, rdb[3].Set(ctx, util.SlotTable[16383], 16383, 0).Err(), "MOVED")
// request a read-only command to node3 that serve slot 16383, that's ok
util.WaitForOffsetSync(t, rdb[2], rdb[3])
util.WaitForOffsetSync(t, rdb[2], rdb[3], 5*time.Second)
//the default option is READWRITE, which will redirect both read and write to master
require.ErrorContains(t, rdb[3].Get(ctx, util.SlotTable[16383]).Err(), "MOVED")

Expand Down Expand Up @@ -445,7 +445,7 @@ func TestClusterMultiple(t *testing.T) {

require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
require.NoError(t, rdb[2].Set(ctx, util.SlotTable[8192], 8192, 0).Err())
util.WaitForOffsetSync(t, rdb[2], rdb[3])
util.WaitForOffsetSync(t, rdb[2], rdb[3], 5*time.Second)
// request node3 that serves slot 8192, that's ok
require.Equal(t, "8192", rdb[3].Get(ctx, util.SlotTable[8192]).Val())

Expand Down
50 changes: 47 additions & 3 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestClusterReplication(t *testing.T) {
require.Equal(t, "slave", util.FindInfoEntry(replicaClient, "role"))
masterClient.Set(ctx, "k0", "v0", 0)
masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
util.WaitForOffsetSync(t, masterClient, replicaClient)
util.WaitForOffsetSync(t, masterClient, replicaClient, 5*time.Second)

require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, []string{"e2", "e1", "e0"}, replicaClient.LRange(ctx, "k1", 0, -1).Val())
Expand All @@ -84,7 +84,7 @@ func TestClusterReplication(t *testing.T) {
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())

util.WaitForOffsetSync(t, masterClient, replicaClient)
util.WaitForOffsetSync(t, masterClient, replicaClient, 5*time.Second)
require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"}, replicaClient.HGetAll(ctx, "k2").Val())
})
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestReplicationContinueRunning(t *testing.T) {
"0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9,
"a": "a", "b": "b", "c": "c", "d": "d", "e": "e", "f": "f", "g": "g", "h": "h", "i": "i", "j": "j", "k": "k"})
require.EqualValues(t, 21, masterClient.HLen(ctx, "myhash").Val())
util.WaitForOffsetSync(t, masterClient, slaveClient)
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
require.Equal(t, "1", slaveClient.HGet(ctx, "myhash", "1").Val())
require.Equal(t, "a", slaveClient.HGet(ctx, "myhash", "a").Val())
})
Expand Down Expand Up @@ -508,3 +508,47 @@ func TestShouldNotReplicate(t *testing.T) {
require.Equal(t, "master", util.FindInfoEntry(masterClient, "role"))
})
}

func TestFullSyncReplication(t *testing.T) {
master := util.StartServer(t, map[string]string{
"rocksdb.write_buffer_size": "4",
"rocksdb.target_file_size_base": "16",
"rocksdb.max_write_buffer_number": "1",
"rocksdb.wal_ttl_seconds": "0",
"rocksdb.wal_size_limit_mb": "0",
})
defer master.Close()
masterClient := master.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()

slave := util.StartServer(t, map[string]string{})
defer slave.Close()
slaveClient := slave.NewClient()
defer func() { require.NoError(t, slaveClient.Close()) }()

ctx := context.Background()

t.Run("Full sync replication should work correctly", func(t *testing.T) {
value := strings.Repeat("a", 128*1024)
for i := 0; i < 1024; i++ {
require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err())
}

util.SlaveOf(t, slaveClient, master)
// Wait more time for full sync to avoid flake test in CI environment
util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second)

// Make sure the full sync happened in replication
syncFullCount, err := strconv.Atoi(util.FindInfoEntry(masterClient, "sync_full"))
require.NoError(t, err)
require.Greater(t, syncFullCount, 0)

got, err := slaveClient.Get(ctx, "key1").Result()
require.NoError(t, err)
require.Equal(t, value, got)

require.NoError(t, masterClient.Set(ctx, "foo", "bar", 0).Err())
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
})
}
2 changes: 1 addition & 1 deletion tests/gocase/integration/rsid/rsid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestRSIDMasterAndReplicaYes(t *testing.T) {

t.Run("chained replication can propagate updates", func(t *testing.T) {
require.NoError(t, rdbB.Set(ctx, "master", "B", 0).Err())
util.WaitForOffsetSync(t, rdbB, rdbA)
util.WaitForOffsetSync(t, rdbB, rdbA, 5*time.Second)
require.Equal(t, "B", rdbA.Get(ctx, "master").Val())
})

Expand Down
4 changes: 2 additions & 2 deletions tests/gocase/tls/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestTLSReplica(t *testing.T) {
require.Equal(t, rc.Get(ctx, "b").Val(), "")
require.NoError(t, sc.Set(ctx, "a", "1", 0).Err())
require.NoError(t, sc.Set(ctx, "b", "2", 0).Err())
util.WaitForOffsetSync(t, sc, rc)
util.WaitForOffsetSync(t, sc, rc, 5*time.Second)
require.Equal(t, rc.Get(ctx, "a").Val(), "1")
require.Equal(t, rc.Get(ctx, "b").Val(), "2")
})
Expand All @@ -189,7 +189,7 @@ func TestTLSReplica(t *testing.T) {
defer func() { require.NoError(t, rc2.Close()) }()

t.Run("TLS: Replication (full)", func(t *testing.T) {
util.WaitForOffsetSync(t, sc, rc2)
util.WaitForOffsetSync(t, sc, rc2, 5*time.Second)
require.Equal(t, rc2.Get(ctx, "a").Val(), "1")
require.Equal(t, rc2.Get(ctx, "b").Val(), "2")
require.Equal(t, rc2.Get(ctx, "c").Val(), "3")
Expand Down
7 changes: 4 additions & 3 deletions tests/gocase/unit/namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestNamespaceReplicate(t *testing.T) {
require.NoError(t, r.Err())
require.Equal(t, "OK", r.Val())
}
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)

// Can read namespaces on master
for ns, token := range nsTokens {
Expand Down Expand Up @@ -211,7 +212,7 @@ func TestNamespaceReplicate(t *testing.T) {
require.NoError(t, r.Err())
require.Equal(t, "OK", r.Val())
}
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)

for ns, token := range nsTokens {
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
Expand All @@ -224,7 +225,7 @@ func TestNamespaceReplicate(t *testing.T) {
require.NoError(t, r.Err())
require.Equal(t, "OK", r.Val())
}
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
util.WaitForOffsetSync(t, slaveRdb, masterRdb, 5*time.Second)

for ns := range nsTokens {
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
Expand Down
5 changes: 3 additions & 2 deletions tests/gocase/unit/scripting/scripting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"math/big"
"testing"
"time"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -510,12 +511,12 @@ func TestScriptingMasterSlave(t *testing.T) {
t.Run("SCRIPTING: script load on master, read on slave", func(t *testing.T) {
sha := masterClient.ScriptLoad(ctx, `return 'script loaded'`).Val()
require.Equal(t, "4167ea82ed9c381c7659f7cf93f394219147e8c4", sha)
util.WaitForOffsetSync(t, masterClient, slaveClient)
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
require.Equal(t, []bool{true}, masterClient.ScriptExists(ctx, sha).Val())
require.Equal(t, []bool{true}, slaveClient.ScriptExists(ctx, sha).Val())

require.NoError(t, masterClient.ScriptFlush(ctx).Err())
util.WaitForOffsetSync(t, masterClient, slaveClient)
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
require.Equal(t, []bool{false}, masterClient.ScriptExists(ctx, sha).Val())
require.Equal(t, []bool{false}, slaveClient.ScriptExists(ctx, sha).Val())
})
Expand Down
4 changes: 2 additions & 2 deletions tests/gocase/util/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func WaitForSync(t testing.TB, slave *redis.Client) {
}, 5*time.Second, 100*time.Millisecond)
}

func WaitForOffsetSync(t testing.TB, master, slave *redis.Client) {
func WaitForOffsetSync(t testing.TB, master, slave *redis.Client, waitFor time.Duration) {
require.Eventually(t, func() bool {
o1 := FindInfoEntry(master, "master_repl_offset")
o2 := FindInfoEntry(slave, "master_repl_offset")
return o1 == o2
}, 5*time.Second, 100*time.Millisecond)
}, waitFor, 100*time.Millisecond)
}

func SlaveOf(t testing.TB, slave *redis.Client, master *KvrocksServer) {
Expand Down

0 comments on commit 2836bf0

Please sign in to comment.