diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 3de51a94047..29da294169f 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -732,6 +732,13 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) { LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish"; post_fullsync_cb_(); + // It needs to reload namespaces from DB after the full sync is done, + // or namespaces are not visible in the replica. + s = srv_->GetNamespace()->LoadAndRewrite(); + if (!s.IsOK()) { + LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " << s.Msg(); + } + // Switch to psync state machine again psync_steps_.Start(); return CBState::QUIT; diff --git a/src/server/namespace.cc b/src/server/namespace.cc index e10c08dc29b..d1f7c228483 100644 --- a/src/server/namespace.cc +++ b/src/server/namespace.cc @@ -54,7 +54,8 @@ bool Namespace::IsAllowModify() const { Status Namespace::loadFromDB(std::map* db_tokens) const { std::string value; engine::Context ctx(storage_); - auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf_, kNamespaceDBKey, &value); + auto cf = storage_->GetCFHandle(ColumnFamilyID::Propagate); + auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf, kNamespaceDBKey, &value); if (!s.ok()) { if (s.IsNotFound()) return Status::OK(); return {Status::NotOK, s.ToString()}; diff --git a/src/server/namespace.h b/src/server/namespace.h index 3e22d382419..d2b45cf3d2a 100644 --- a/src/server/namespace.h +++ b/src/server/namespace.h @@ -26,8 +26,7 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__"; class Namespace { public: - explicit Namespace(engine::Storage *storage) - : storage_(storage), cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {} + explicit Namespace(engine::Storage *storage) : storage_(storage) {} ~Namespace() = default; Namespace(const Namespace &) = delete; @@ -45,7 +44,6 @@ class Namespace { private: engine::Storage *storage_; - rocksdb::ColumnFamilyHandle *cf_ = nullptr; std::shared_mutex tokens_mu_; // mapping from token to namespace name diff --git a/tests/gocase/unit/namespace/namespace_test.go b/tests/gocase/unit/namespace/namespace_test.go index 9409b61a5a7..ff11af84757 100644 --- a/tests/gocase/unit/namespace/namespace_test.go +++ b/tests/gocase/unit/namespace/namespace_test.go @@ -21,6 +21,8 @@ package namespace import ( "context" + "fmt" + "strings" "sync" "testing" "time" @@ -252,6 +254,43 @@ func TestNamespaceReplicate(t *testing.T) { }) } +func TestNamespaceReplicateWithFullSync(t *testing.T) { + config := map[string]string{ + "rocksdb.write_buffer_size": "4", + "rocksdb.target_file_size_base": "16", + "rocksdb.max_write_buffer_number": "1", + "rocksdb.wal_ttl_seconds": "0", + "rocksdb.wal_size_limit_mb": "0", + "repl-namespace-enabled": "yes", + "requirepass": "123", + "masterauth": "123", + } + master := util.StartServer(t, config) + defer master.Close() + masterClient := master.NewClientWithOption(&redis.Options{Password: "123"}) + defer func() { require.NoError(t, masterClient.Close()) }() + + slave := util.StartServer(t, config) + defer slave.Close() + slaveClient := slave.NewClientWithOption(&redis.Options{Password: "123"}) + defer func() { require.NoError(t, slaveClient.Close()) }() + + ctx := context.Background() + value := strings.Repeat("a", 128*1024) + for i := 0; i < 1024; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err()) + } + require.NoError(t, masterClient.Do(ctx, "NAMESPACE", "ADD", "foo", "bar").Err()) + + util.SlaveOf(t, slaveClient, master) + util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second) + + // Namespaces should be replicated after the full sync + token, err := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Result() + require.NoError(t, err) + require.EqualValues(t, "bar", token) +} + func TestNamespaceRewrite(t *testing.T) { password := "pwd" srv := util.StartServerWithCLIOptions(t, false, map[string]string{