From 495a54fcc6a336886a8d6b6d186c370dc3de3fa1 Mon Sep 17 00:00:00 2001 From: Taein Lim <101996424+taeng0204@users.noreply.github.com> Date: Wed, 7 Aug 2024 22:12:35 +0900 Subject: [PATCH 01/18] Add HTTP health check handler for server health monitoring (#952) Added the handler to allow health checks to be performed with plain HTTP GET requests needed for traditional uptime checker or load balancer, along with existing gRPC health check. --- server/rpc/httphealth/httphealth.go | 62 ++++++++++++++++++++ server/rpc/server.go | 15 +++-- test/integration/health_test.go | 91 +++++++++++++++++++++++++++-- 3 files changed, 156 insertions(+), 12 deletions(-) create mode 100644 server/rpc/httphealth/httphealth.go diff --git a/server/rpc/httphealth/httphealth.go b/server/rpc/httphealth/httphealth.go new file mode 100644 index 000000000..0b026cfe3 --- /dev/null +++ b/server/rpc/httphealth/httphealth.go @@ -0,0 +1,62 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package httphealth uses http GET to provide a health check for the server. +package httphealth + +import ( + "encoding/json" + "net/http" + + "connectrpc.com/grpchealth" +) + +// CheckResponse represents the response structure for health checks. +type CheckResponse struct { + Status string `json:"status"` +} + +// NewHandler creates a new HTTP handler for health checks. +func NewHandler(checker grpchealth.Checker) (string, http.Handler) { + const serviceName = "/healthz/" + check := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + var checkRequest grpchealth.CheckRequest + service := r.URL.Query().Get("service") + if service != "" { + checkRequest.Service = service + } + checkResponse, err := checker.Check(r.Context(), &checkRequest) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + resp, err := json.Marshal(CheckResponse{checkResponse.Status.String()}) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if _, err := w.Write(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + }) + return serviceName, check +} diff --git a/server/rpc/server.go b/server/rpc/server.go index 85f57b521..cfd06ad88 100644 --- a/server/rpc/server.go +++ b/server/rpc/server.go @@ -36,6 +36,7 @@ import ( "github.com/yorkie-team/yorkie/server/backend" "github.com/yorkie-team/yorkie/server/logging" "github.com/yorkie-team/yorkie/server/rpc/auth" + "github.com/yorkie-team/yorkie/server/rpc/httphealth" "github.com/yorkie-team/yorkie/server/rpc/interceptors" ) @@ -62,16 +63,18 @@ func NewServer(conf *Config, be *backend.Backend) (*Server, error) { ), } - yorkieServiceCtx, yorkieServiceCancel := context.WithCancel(context.Background()) - mux := http.NewServeMux() - mux.Handle(v1connect.NewYorkieServiceHandler(newYorkieServer(yorkieServiceCtx, be), opts...)) - mux.Handle(v1connect.NewAdminServiceHandler(newAdminServer(be, tokenManager), opts...)) - mux.Handle(grpchealth.NewHandler(grpchealth.NewStaticChecker( + healthChecker := grpchealth.NewStaticChecker( grpchealth.HealthV1ServiceName, v1connect.YorkieServiceName, v1connect.AdminServiceName, - ))) + ) + yorkieServiceCtx, yorkieServiceCancel := context.WithCancel(context.Background()) + mux := http.NewServeMux() + mux.Handle(v1connect.NewYorkieServiceHandler(newYorkieServer(yorkieServiceCtx, be), opts...)) + mux.Handle(v1connect.NewAdminServiceHandler(newAdminServer(be, tokenManager), opts...)) + mux.Handle(grpchealth.NewHandler(healthChecker)) + mux.Handle(httphealth.NewHandler(healthChecker)) // TODO(hackerwins): We need to provide proper http server configuration. return &Server{ conf: conf, diff --git a/test/integration/health_test.go b/test/integration/health_test.go index 6ee176de5..50dd45702 100644 --- a/test/integration/health_test.go +++ b/test/integration/health_test.go @@ -20,16 +20,26 @@ package integration import ( "context" + "encoding/json" + "net/http" "testing" + "connectrpc.com/grpchealth" "github.com/stretchr/testify/assert" + "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" + "github.com/yorkie-team/yorkie/server/rpc/httphealth" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -func TestHealthCheck(t *testing.T) { - // use gRPC health check +var services = []string{ + grpchealth.HealthV1ServiceName, + v1connect.YorkieServiceName, + v1connect.AdminServiceName, +} + +func TestRPCHealthCheck(t *testing.T) { conn, err := grpc.Dial( defaultServer.RPCAddr(), grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -38,9 +48,78 @@ func TestHealthCheck(t *testing.T) { defer func() { assert.NoError(t, conn.Close()) }() - cli := healthpb.NewHealthClient(conn) - resp, err := cli.Check(context.Background(), &healthpb.HealthCheckRequest{}) - assert.NoError(t, err) - assert.Equal(t, resp.Status, healthpb.HealthCheckResponse_SERVING) + + // check default service + t.Run("Service: default", func(t *testing.T) { + resp, err := cli.Check(context.Background(), &healthpb.HealthCheckRequest{}) + assert.NoError(t, err) + assert.Equal(t, resp.Status, healthpb.HealthCheckResponse_SERVING) + }) + + // check all services + for _, s := range services { + service := s + t.Run("Service: "+service, func(t *testing.T) { + resp, err := cli.Check(context.Background(), &healthpb.HealthCheckRequest{ + Service: service, + }) + assert.NoError(t, err) + assert.Equal(t, resp.Status, healthpb.HealthCheckResponse_SERVING) + }) + } + + // check unknown service + t.Run("Service: unknown", func(t *testing.T) { + _, err := cli.Check(context.Background(), &healthpb.HealthCheckRequest{ + Service: "unknown", + }) + assert.Error(t, err) + }) +} + +func TestHTTPHealthCheck(t *testing.T) { + // check default service + t.Run("Service: default", func(t *testing.T) { + resp, err := http.Get("http://" + defaultServer.RPCAddr() + "/healthz/") + assert.NoError(t, err) + defer func() { + assert.NoError(t, resp.Body.Close()) + }() + assert.Equal(t, resp.StatusCode, http.StatusOK) + + var healthResp httphealth.CheckResponse + err = json.NewDecoder(resp.Body).Decode(&healthResp) + assert.NoError(t, err) + assert.Equal(t, healthResp.Status, grpchealth.StatusServing.String()) + }) + + // check all services + for _, s := range services { + service := s + t.Run("Service: "+service, func(t *testing.T) { + url := "http://" + defaultServer.RPCAddr() + "/healthz/?service=" + service + resp, err := http.Get(url) + assert.NoError(t, err) + defer func() { + assert.NoError(t, resp.Body.Close()) + }() + assert.Equal(t, resp.StatusCode, http.StatusOK) + + var healthResp httphealth.CheckResponse + err = json.NewDecoder(resp.Body).Decode(&healthResp) + assert.NoError(t, err) + assert.Equal(t, healthResp.Status, grpchealth.StatusServing.String()) + }) + } + + // check unknown service + t.Run("Service: unknown", func(t *testing.T) { + resp, err := http.Get("http://" + defaultServer.RPCAddr() + "/healthz/?service=unknown") + assert.NoError(t, err) + defer func() { + assert.NoError(t, resp.Body.Close()) + }() + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + }) } From 4344fce28c5e7b9c26fb079f9f16b3b405d4d18c Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 02:50:55 +0900 Subject: [PATCH 02/18] Implement validation method to ensure sequential ClientSeq in reqPack.Changes --- server/packs/packs.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/packs/packs.go b/server/packs/packs.go index f37d94655..193932743 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -20,6 +20,7 @@ package packs import ( "context" + "errors" "fmt" "strconv" gotime "time" @@ -37,6 +38,11 @@ import ( "github.com/yorkie-team/yorkie/server/logging" ) +var ( + // ErrClientSeqNotSequential is returned when ClientSeq in reqPack.Changes are not sequential + ErrClientSeqNotSequential = errors.New("ClientSeq in reqPack.Changes are not sequential") +) + // PushPullKey creates a new sync.Key of PushPull for the given document. func PushPullKey(projectID types.ID, docKey key.Key) sync.Key { return sync.NewKey(fmt.Sprintf("pushpull-%s-%s", projectID, docKey)) @@ -77,6 +83,11 @@ func PushPull( // TODO: Changes may be reordered or missing during communication on the network. // We should check the change.pack with checkpoint to make sure the changes are in the correct order. + err := validateClientSeqSequential(reqPack.Changes) + if err != nil { + return nil, err + } + initialServerSeq := docInfo.ServerSeq // 01. push changes: filter out the changes that are already saved in the database. @@ -272,3 +283,24 @@ func BuildDocumentForServerSeq( return doc, nil } + +func validateClientSeqSequential(changes []*change.Change) error { + if len(changes) <= 1 { + return nil + } + + nextClientSeq := changes[0].ClientSeq() + for _, cn := range changes[1:] { + nextClientSeq++ + + if nextClientSeq != cn.ClientSeq() { + return fmt.Errorf( + "ClientSeq in Changes are not sequential (expected: %d, actual: %d) : %w", + nextClientSeq, + cn.ClientSeq(), + ErrClientSeqNotSequential, + ) + } + } + return nil +} From c5abdcdeb17e09d1553572b150bd140ddcf235b8 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 02:52:53 +0900 Subject: [PATCH 03/18] Add ErrClientSeqNotSequential to errorToCode Mappers in yorkie.connect --- server/rpc/connecthelper/status.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/rpc/connecthelper/status.go b/server/rpc/connecthelper/status.go index 8708b401b..c7f499bef 100644 --- a/server/rpc/connecthelper/status.go +++ b/server/rpc/connecthelper/status.go @@ -48,6 +48,7 @@ var errorToConnectCode = map[error]connect.Code{ clients.ErrInvalidClientKey: connect.CodeInvalidArgument, key.ErrInvalidKey: connect.CodeInvalidArgument, types.ErrEmptyProjectFields: connect.CodeInvalidArgument, + packs.ErrClientSeqNotSequential: connect.CodeInvalidArgument, // NotFound means the requested resource does not exist. database.ErrProjectNotFound: connect.CodeNotFound, @@ -100,6 +101,7 @@ var errorToCode = map[error]string{ clients.ErrInvalidClientKey: "ErrInvalidClientKey", key.ErrInvalidKey: "ErrInvalidKey", types.ErrEmptyProjectFields: "ErrEmptyProjectFields", + packs.ErrClientSeqNotSequential: "ErrClientSeqNotSequential", database.ErrProjectNotFound: "ErrProjectNotFound", database.ErrClientNotFound: "ErrClientNotFound", From 62e7f3d99eca835a99d3c2bd0a68512af7b498d4 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 03:01:11 +0900 Subject: [PATCH 04/18] Write 'packs' test code for the following cases: sequential ClientSeq, non-sequential ClientSeq, ClientSeq greater than ClientInfo's ClientSeq, and ServerSeq greater than DocInfo's ServerSeq. --- server/packs/packs_test.go | 226 +++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 server/packs/packs_test.go diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go new file mode 100644 index 000000000..3b855df4d --- /dev/null +++ b/server/packs/packs_test.go @@ -0,0 +1,226 @@ +package packs_test + +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/yorkie-team/yorkie/api/converter" + "github.com/yorkie-team/yorkie/api/types" + api "github.com/yorkie-team/yorkie/api/yorkie/v1" + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/change" + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/server/backend" + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/clients" + "github.com/yorkie-team/yorkie/server/documents" + "github.com/yorkie-team/yorkie/server/packs" + "github.com/yorkie-team/yorkie/server/profiling/prometheus" + "github.com/yorkie-team/yorkie/server/rpc/connecthelper" + "github.com/yorkie-team/yorkie/test/helper" + "testing" +) + +var ( + clientId = "000000000000000000000001" +) + +func Test(t *testing.T) { + t.Run("test", func(t *testing.T) { + RunPushPullWithSequentialClientSeqTest(t) + }) + + t.Run("test", func(t *testing.T) { + RunPushPullWithNotSequentialClientSeqTest(t) + }) + + t.Run("test", func(t *testing.T) { + RunPushPullWithClientSeqGreaterThanClientInfoTest(t) + }) + + t.Run("test", func(t *testing.T) { + RunPushPullWithServerSeqGreaterThanDocInfoTest(t) + }) +} + +func RunPushPullWithSequentialClientSeqTest(t *testing.T) { + // given + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) + actorID, _ := time.ActorIDFromHex(clientId) + + changePackWithSequentialClientSeq, _ := createChangePackWithSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackWithSequentialClientSeq.DocumentKey, true) + clientInfo.AttachDocument(docInfo.ID, changePackWithSequentialClientSeq.IsAttached()) + + _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithSequentialClientSeq, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.NoError(t, err) +} + +func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) + + actorID, _ := time.ActorIDFromHex(clientId) + changePackWithNotSequentialClientSeq, _ := createChangePackWithNotSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackWithNotSequentialClientSeq.DocumentKey, true) + clientInfo.AttachDocument(docInfo.ID, changePackWithNotSequentialClientSeq.IsAttached()) + + _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithNotSequentialClientSeq, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequential), connecthelper.CodeOf(err)) +} + +func RunPushPullWithClientSeqGreaterThanClientInfoTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) + + actorID, _ := time.ActorIDFromHex(clientId) + changePackFixture, _ := createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackFixture.DocumentKey, true) + clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + + packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + changePackWithClientSeqGreaterThanClientInfo, _ := createChangePackWithClientSeqGreaterThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes()) + _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithClientSeqGreaterThanClientInfo, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + assert.NoError(t, err) +} + +func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) + + actorID, _ := time.ActorIDFromHex(clientId) + changePackFixture, _ := createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackFixture.DocumentKey, true) + clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + + packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + changePackWithServerSeqGreaterThanDocInfo, _ := createChangePackWithServerSeqGreaterThanDocInfo(helper.TestDocKey(t).String()) + + _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithServerSeqGreaterThanDocInfo, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + assert.Equal(t, connecthelper.CodeOf(packs.ErrInvalidServerSeq), connecthelper.CodeOf(err)) +} + +func createChangePackWithSequentialClientSeq(documentKey string, actorId []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0}, + Changes: []*api.Change{ + createChange(0, 0, actorId), + createChange(1, 1, actorId), + createChange(2, 2, actorId), + }, + }) +} + +func createChangePackWithNotSequentialClientSeq(documentKey string, actorId []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0}, + Changes: []*api.Change{ + createChange(2, 2, actorId), + createChange(1, 1, actorId), + createChange(0, 0, actorId), + }, + }) +} + +func createChangePackWithClientSeqGreaterThanClientInfo(documentKey string, actorId []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 1e9}, + Changes: []*api.Change{ + createChange(1e9, 1e9, actorId), + }, + }) +} + +func createChangePackFixture(documentKey string, actorId []byte) (*change.Pack, error) { + return createChangePackWithSequentialClientSeq(documentKey, actorId) +} + +func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 1e9, ClientSeq: 2}, + }) +} + +func createChange(clientSeq uint32, lamport int64, actorId []byte) *api.Change { + return &api.Change{ + Id: &api.ChangeID{ + ClientSeq: clientSeq, + Lamport: lamport, + ActorId: actorId, + }, + } +} + +func setUpBackend( + t *testing.T, +) *backend.Backend { + conf := helper.TestConfig() + + metrics, err := prometheus.NewMetrics() + assert.NoError(t, err) + + be, err := backend.New( + conf.Backend, + conf.Mongo, + conf.Housekeeping, + metrics, + ) + assert.NoError(t, err) + + return be +} From 2b58d1d90bba48b117fcd8da8025d7fbc134e29a Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 03:49:37 +0900 Subject: [PATCH 05/18] Fix linting issues in packs_test --- server/packs/packs_test.go | 158 ++++++++++++++++++++++--------------- 1 file changed, 96 insertions(+), 62 deletions(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 3b855df4d..6e36a948b 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -21,7 +21,7 @@ import ( ) var ( - clientId = "000000000000000000000001" + clientID = "000000000000000000000001" ) func Test(t *testing.T) { @@ -51,18 +51,26 @@ func RunPushPullWithSequentialClientSeqTest(t *testing.T) { database.DefaultProjectID, ) - clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) - actorID, _ := time.ActorIDFromHex(clientId) + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + actorID, _ := time.ActorIDFromHex(clientID) - changePackWithSequentialClientSeq, _ := createChangePackWithSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) + changePackWithSequentialClientSeq, _ := + createChangePackWithSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) - docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackWithSequentialClientSeq.DocumentKey, true) - clientInfo.AttachDocument(docInfo.ID, changePackWithSequentialClientSeq.IsAttached()) + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackWithSequentialClientSeq.DocumentKey, true) + err := clientInfo.AttachDocument( + docInfo.ID, changePackWithSequentialClientSeq.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } - _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithSequentialClientSeq, packs.PushPullOptions{ - Mode: types.SyncModePushPull, - Status: document.StatusAttached, - }) + _, err = packs.PushPull( + ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithSequentialClientSeq, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) assert.NoError(t, err) } @@ -74,19 +82,26 @@ func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) { database.DefaultProjectID, ) - clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) - - actorID, _ := time.ActorIDFromHex(clientId) - changePackWithNotSequentialClientSeq, _ := createChangePackWithNotSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) - docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackWithNotSequentialClientSeq.DocumentKey, true) - clientInfo.AttachDocument(docInfo.ID, changePackWithNotSequentialClientSeq.IsAttached()) + actorID, _ := time.ActorIDFromHex(clientID) + changePackWithNotSequentialClientSeq, _ := + createChangePackWithNotSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) - _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithNotSequentialClientSeq, packs.PushPullOptions{ - Mode: types.SyncModePushPull, - Status: document.StatusAttached, - }) + docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, + changePackWithNotSequentialClientSeq.DocumentKey, true) + err := clientInfo.AttachDocument( + docInfo.ID, changePackWithNotSequentialClientSeq.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + _, err = packs.PushPull( + ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithNotSequentialClientSeq, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequential), connecthelper.CodeOf(err)) } @@ -98,24 +113,35 @@ func RunPushPullWithClientSeqGreaterThanClientInfoTest(t *testing.T) { database.DefaultProjectID, ) - clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) - actorID, _ := time.ActorIDFromHex(clientId) - changePackFixture, _ := createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + actorID, _ := time.ActorIDFromHex(clientID) + changePackFixture, _ := + createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) - docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackFixture.DocumentKey, true) - clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackFixture.DocumentKey, true) + err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } - packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ - Mode: types.SyncModePushPull, - Status: document.StatusAttached, - }) + _, err = packs.PushPull(ctx, be, project.ToProject(), + clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + if err != nil { + assert.Fail(t, "failed to push pull") + } - changePackWithClientSeqGreaterThanClientInfo, _ := createChangePackWithClientSeqGreaterThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes()) - _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithClientSeqGreaterThanClientInfo, packs.PushPullOptions{ - Mode: types.SyncModePushPull, - Status: document.StatusAttached, - }) + changePackWithClientSeqGreaterThanClientInfo, _ := + createChangePackWithClientSeqGreaterThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes()) + _, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithClientSeqGreaterThanClientInfo, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) assert.NoError(t, err) } @@ -128,65 +154,73 @@ func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) { database.DefaultProjectID, ) - clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientId) + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) - actorID, _ := time.ActorIDFromHex(clientId) - changePackFixture, _ := createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + actorID, _ := time.ActorIDFromHex(clientID) + changePackFixture, _ := + createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) - docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackFixture.DocumentKey, true) - clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackFixture.DocumentKey, true) + err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } - packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ - Mode: types.SyncModePushPull, - Status: document.StatusAttached, - }) + _, _ = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, + changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) - changePackWithServerSeqGreaterThanDocInfo, _ := createChangePackWithServerSeqGreaterThanDocInfo(helper.TestDocKey(t).String()) + changePackWithServerSeqGreaterThanDocInfo, _ := + createChangePackWithServerSeqGreaterThanDocInfo(helper.TestDocKey(t).String()) - _, err := packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, changePackWithServerSeqGreaterThanDocInfo, packs.PushPullOptions{ - Mode: types.SyncModePushPull, - Status: document.StatusAttached, - }) + _, err = packs.PushPull(ctx, be, project.ToProject(), + clientInfo, docInfo, changePackWithServerSeqGreaterThanDocInfo, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) assert.Equal(t, connecthelper.CodeOf(packs.ErrInvalidServerSeq), connecthelper.CodeOf(err)) } -func createChangePackWithSequentialClientSeq(documentKey string, actorId []byte) (*change.Pack, error) { +func createChangePackWithSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0}, Changes: []*api.Change{ - createChange(0, 0, actorId), - createChange(1, 1, actorId), - createChange(2, 2, actorId), + createChange(0, 0, actorID), + createChange(1, 1, actorID), + createChange(2, 2, actorID), }, }) } -func createChangePackWithNotSequentialClientSeq(documentKey string, actorId []byte) (*change.Pack, error) { +func createChangePackWithNotSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0}, Changes: []*api.Change{ - createChange(2, 2, actorId), - createChange(1, 1, actorId), - createChange(0, 0, actorId), + createChange(2, 2, actorID), + createChange(1, 1, actorID), + createChange(0, 0, actorID), }, }) } -func createChangePackWithClientSeqGreaterThanClientInfo(documentKey string, actorId []byte) (*change.Pack, error) { +func createChangePackWithClientSeqGreaterThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 1e9}, Changes: []*api.Change{ - createChange(1e9, 1e9, actorId), + createChange(1e9, 1e9, actorID), }, }) } -func createChangePackFixture(documentKey string, actorId []byte) (*change.Pack, error) { - return createChangePackWithSequentialClientSeq(documentKey, actorId) +func createChangePackFixture(documentKey string, actorID []byte) (*change.Pack, error) { + return createChangePackWithSequentialClientSeq(documentKey, actorID) } func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*change.Pack, error) { @@ -196,12 +230,12 @@ func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*chang }) } -func createChange(clientSeq uint32, lamport int64, actorId []byte) *api.Change { +func createChange(clientSeq uint32, lamport int64, actorID []byte) *api.Change { return &api.Change{ Id: &api.ChangeID{ ClientSeq: clientSeq, Lamport: lamport, - ActorId: actorId, + ActorId: actorID, }, } } From 6d13dc92197ff12ced6871c8d48bc240f75aa782 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 14:26:03 +0900 Subject: [PATCH 06/18] Fix goimports issues in packs_test --- server/packs/packs_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 6e36a948b..89d5cf39e 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -2,6 +2,8 @@ package packs_test import ( "context" + "testing" + "github.com/stretchr/testify/assert" "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" @@ -17,7 +19,6 @@ import ( "github.com/yorkie-team/yorkie/server/profiling/prometheus" "github.com/yorkie-team/yorkie/server/rpc/connecthelper" "github.com/yorkie-team/yorkie/test/helper" - "testing" ) var ( From a34819288bfca20f03c7fae290920e37c3773455 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 16:05:32 +0900 Subject: [PATCH 07/18] Fix goimports issues in packs_test --- server/packs/packs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 89d5cf39e..e56fde1e3 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -1,7 +1,6 @@ package packs_test import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -19,6 +18,7 @@ import ( "github.com/yorkie-team/yorkie/server/profiling/prometheus" "github.com/yorkie-team/yorkie/server/rpc/connecthelper" "github.com/yorkie-team/yorkie/test/helper" + "golang.org/x/net/context" ) var ( From e4474718da1dc325ab7348f79f33b5f3b1e1aefb Mon Sep 17 00:00:00 2001 From: binary-ho Date: Sat, 10 Aug 2024 22:09:07 +0900 Subject: [PATCH 08/18] Rewrite test description --- server/packs/packs_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index e56fde1e3..8f20f1037 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -26,25 +26,24 @@ var ( ) func Test(t *testing.T) { - t.Run("test", func(t *testing.T) { + t.Run("push/pull sequential ClientSeq test", func(t *testing.T) { RunPushPullWithSequentialClientSeqTest(t) }) - t.Run("test", func(t *testing.T) { + t.Run("push/pull not sequential ClientSeq test", func(t *testing.T) { RunPushPullWithNotSequentialClientSeqTest(t) }) - t.Run("test", func(t *testing.T) { + t.Run("push/pull ClientSeq greater than ClinentInfo's ClientSeq", func(t *testing.T) { RunPushPullWithClientSeqGreaterThanClientInfoTest(t) }) - t.Run("test", func(t *testing.T) { + t.Run("push/pull ServerSeq greater than DocInfo's ServerSeq", func(t *testing.T) { RunPushPullWithServerSeqGreaterThanDocInfoTest(t) }) } func RunPushPullWithSequentialClientSeqTest(t *testing.T) { - // given ctx := context.Background() be := setUpBackend(t) project, _ := be.DB.FindProjectInfoByID( From 59392e09e2344d0bed9ce4d5211d155567a3a50b Mon Sep 17 00:00:00 2001 From: binary-ho Date: Sat, 10 Aug 2024 22:27:48 +0900 Subject: [PATCH 09/18] Fix goimports issues in packs_test.go --- server/packs/packs_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 8f20f1037..95486b61d 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" @@ -18,7 +20,6 @@ import ( "github.com/yorkie-team/yorkie/server/profiling/prometheus" "github.com/yorkie-team/yorkie/server/rpc/connecthelper" "github.com/yorkie-team/yorkie/test/helper" - "golang.org/x/net/context" ) var ( From 7712bb8e893b8e6fd3bac5c699e2ee5241e56e47 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sat, 10 Aug 2024 23:33:06 +0900 Subject: [PATCH 10/18] Fix push/pull test where ClientSeq is less than ClientInfo's ClientSeq --- server/packs/packs_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 95486b61d..44afcabbc 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -27,7 +27,7 @@ var ( ) func Test(t *testing.T) { - t.Run("push/pull sequential ClientSeq test", func(t *testing.T) { + t.Run("push/pull sequential ClientSeq test (happy case)", func(t *testing.T) { RunPushPullWithSequentialClientSeqTest(t) }) @@ -35,8 +35,8 @@ func Test(t *testing.T) { RunPushPullWithNotSequentialClientSeqTest(t) }) - t.Run("push/pull ClientSeq greater than ClinentInfo's ClientSeq", func(t *testing.T) { - RunPushPullWithClientSeqGreaterThanClientInfoTest(t) + t.Run("push/pull ClientSeq less than ClientInfo's ClientSeq (duplicated request)", func(t *testing.T) { + RunPushPullWithClientSeqLessThanClientInfoTest(t) }) t.Run("push/pull ServerSeq greater than DocInfo's ServerSeq", func(t *testing.T) { @@ -106,7 +106,7 @@ func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) { assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequential), connecthelper.CodeOf(err)) } -func RunPushPullWithClientSeqGreaterThanClientInfoTest(t *testing.T) { +func RunPushPullWithClientSeqLessThanClientInfoTest(t *testing.T) { ctx := context.Background() be := setUpBackend(t) project, _ := be.DB.FindProjectInfoByID( @@ -136,10 +136,10 @@ func RunPushPullWithClientSeqGreaterThanClientInfoTest(t *testing.T) { assert.Fail(t, "failed to push pull") } - changePackWithClientSeqGreaterThanClientInfo, _ := - createChangePackWithClientSeqGreaterThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes()) + changePackWithClientSeqLessThanClientInfo, _ := + createChangePackWithClientSeqLessThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes()) _, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, - changePackWithClientSeqGreaterThanClientInfo, packs.PushPullOptions{ + changePackWithClientSeqLessThanClientInfo, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, }) @@ -189,7 +189,7 @@ func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) { func createChangePackWithSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, - Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0}, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2}, Changes: []*api.Change{ createChange(0, 0, actorID), createChange(1, 1, actorID), @@ -210,12 +210,12 @@ func createChangePackWithNotSequentialClientSeq(documentKey string, actorID []by }) } -func createChangePackWithClientSeqGreaterThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) { +func createChangePackWithClientSeqLessThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, - Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 1e9}, + Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 0}, Changes: []*api.Change{ - createChange(1e9, 1e9, actorID), + createChange(0, 0, actorID), }, }) } From be485a72a681213b11f30f1c71efa4bb5cd36dbb Mon Sep 17 00:00:00 2001 From: binary-ho Date: Sun, 11 Aug 2024 00:23:02 +0900 Subject: [PATCH 11/18] Fix goimports issue at health_test.go --- test/integration/health_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/integration/health_test.go b/test/integration/health_test.go index 1b6160334..c6a6f787e 100644 --- a/test/integration/health_test.go +++ b/test/integration/health_test.go @@ -26,14 +26,15 @@ import ( "connectrpc.com/grpchealth" "github.com/stretchr/testify/assert" - "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" - "github.com/yorkie-team/yorkie/server/rpc/httphealth" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" healthpb "google.golang.org/grpc/health/grpc_health_v1" "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" "github.com/yorkie-team/yorkie/server/rpc/httphealth" + + "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" + "github.com/yorkie-team/yorkie/server/rpc/httphealth" ) var services = []string{ From b2472f49aa08f4637952f2be406f36bbbd510994 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sun, 11 Aug 2024 00:42:59 +0900 Subject: [PATCH 12/18] Rollback health_test.go --- test/integration/health_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/integration/health_test.go b/test/integration/health_test.go index c6a6f787e..eb0f7713c 100644 --- a/test/integration/health_test.go +++ b/test/integration/health_test.go @@ -32,9 +32,6 @@ import ( "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" "github.com/yorkie-team/yorkie/server/rpc/httphealth" - - "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" - "github.com/yorkie-team/yorkie/server/rpc/httphealth" ) var services = []string{ From 511f8f1119dadd6a28eadd04a89fa8849751d0cd Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sun, 11 Aug 2024 19:18:13 +0900 Subject: [PATCH 13/18] Add Validation ClientSeq is Sequential With DocInfo.Checkpoint.ClientSeq --- server/packs/packs.go | 39 ++++++++++++++++++++++----- server/rpc/connecthelper/status.go | 42 ++++++++++++++++-------------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/server/packs/packs.go b/server/packs/packs.go index 193932743..a672072c4 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -39,8 +39,11 @@ import ( ) var ( - // ErrClientSeqNotSequential is returned when ClientSeq in reqPack.Changes are not sequential - ErrClientSeqNotSequential = errors.New("ClientSeq in reqPack.Changes are not sequential") + // ErrClientSeqNotSequentialWithCheckpoint is returned when ClientSeq in reqPack are not sequential With DocInfo.Checkpoint.ClientSeq + ErrClientSeqNotSequentialWithCheckpoint = errors.New("ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq") + + // ErrClientSeqInChangesAreNotSequential is returned when ClientSeq in reqPack.Changes are not sequential + ErrClientSeqInChangesAreNotSequential = errors.New("ClientSeq in reqPack.Changes are not sequential") ) // PushPullKey creates a new sync.Key of PushPull for the given document. @@ -83,7 +86,8 @@ func PushPull( // TODO: Changes may be reordered or missing during communication on the network. // We should check the change.pack with checkpoint to make sure the changes are in the correct order. - err := validateClientSeqSequential(reqPack.Changes) + checkpoint := clientInfo.Checkpoint(docInfo.ID) + err := validateClientSeqSequential(reqPack.Changes, checkpoint) if err != nil { return nil, err } @@ -284,11 +288,34 @@ func BuildDocumentForServerSeq( return doc, nil } -func validateClientSeqSequential(changes []*change.Change) error { - if len(changes) <= 1 { +func validateClientSeqSequential(changes []*change.Change, checkpoint change.Checkpoint) error { + if len(changes) < 1 { return nil } + if err := validateClientSeqSequentialWithCheckpoint(changes, checkpoint); err != nil { + return err + } + + return validateClientSeqInChangesAreSequential(changes) +} + +func validateClientSeqSequentialWithCheckpoint(changes []*change.Change, checkpoint change.Checkpoint) error { + expectedClientSeq := checkpoint.ClientSeq + 1 + actualFirstClientSeq := changes[0].ClientSeq() + + if expectedClientSeq != actualFirstClientSeq { + return fmt.Errorf( + "ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq (expected: %d, actual: %d) : %w", + expectedClientSeq, + actualFirstClientSeq, + ErrClientSeqNotSequentialWithCheckpoint, + ) + } + return nil +} + +func validateClientSeqInChangesAreSequential(changes []*change.Change) error { nextClientSeq := changes[0].ClientSeq() for _, cn := range changes[1:] { nextClientSeq++ @@ -298,7 +325,7 @@ func validateClientSeqSequential(changes []*change.Change) error { "ClientSeq in Changes are not sequential (expected: %d, actual: %d) : %w", nextClientSeq, cn.ClientSeq(), - ErrClientSeqNotSequential, + ErrClientSeqInChangesAreNotSequential, ) } } diff --git a/server/rpc/connecthelper/status.go b/server/rpc/connecthelper/status.go index c7f499bef..e1ca7538c 100644 --- a/server/rpc/connecthelper/status.go +++ b/server/rpc/connecthelper/status.go @@ -39,16 +39,17 @@ import ( // errorToConnectCode maps an error to connectRPC status code. var errorToConnectCode = map[error]connect.Code{ // InvalidArgument means the request is malformed. - converter.ErrPackRequired: connect.CodeInvalidArgument, - converter.ErrCheckpointRequired: connect.CodeInvalidArgument, - time.ErrInvalidHexString: connect.CodeInvalidArgument, - time.ErrInvalidActorID: connect.CodeInvalidArgument, - types.ErrInvalidID: connect.CodeInvalidArgument, - clients.ErrInvalidClientID: connect.CodeInvalidArgument, - clients.ErrInvalidClientKey: connect.CodeInvalidArgument, - key.ErrInvalidKey: connect.CodeInvalidArgument, - types.ErrEmptyProjectFields: connect.CodeInvalidArgument, - packs.ErrClientSeqNotSequential: connect.CodeInvalidArgument, + converter.ErrPackRequired: connect.CodeInvalidArgument, + converter.ErrCheckpointRequired: connect.CodeInvalidArgument, + time.ErrInvalidHexString: connect.CodeInvalidArgument, + time.ErrInvalidActorID: connect.CodeInvalidArgument, + types.ErrInvalidID: connect.CodeInvalidArgument, + clients.ErrInvalidClientID: connect.CodeInvalidArgument, + clients.ErrInvalidClientKey: connect.CodeInvalidArgument, + key.ErrInvalidKey: connect.CodeInvalidArgument, + types.ErrEmptyProjectFields: connect.CodeInvalidArgument, + packs.ErrClientSeqNotSequentialWithCheckpoint: connect.CodeInvalidArgument, + packs.ErrClientSeqInChangesAreNotSequential: connect.CodeInvalidArgument, // NotFound means the requested resource does not exist. database.ErrProjectNotFound: connect.CodeNotFound, @@ -92,16 +93,17 @@ var errorToConnectCode = map[error]connect.Code{ // TODO(hackerwins): We need to add codes by hand for each error. It would be // better to generate this map automatically. var errorToCode = map[error]string{ - converter.ErrPackRequired: "ErrPackRequired", - converter.ErrCheckpointRequired: "ErrCheckpointRequired", - time.ErrInvalidHexString: "ErrInvalidHexString", - time.ErrInvalidActorID: "ErrInvalidActorID", - types.ErrInvalidID: "ErrInvalidID", - clients.ErrInvalidClientID: "ErrInvalidClientID", - clients.ErrInvalidClientKey: "ErrInvalidClientKey", - key.ErrInvalidKey: "ErrInvalidKey", - types.ErrEmptyProjectFields: "ErrEmptyProjectFields", - packs.ErrClientSeqNotSequential: "ErrClientSeqNotSequential", + converter.ErrPackRequired: "ErrPackRequired", + converter.ErrCheckpointRequired: "ErrCheckpointRequired", + time.ErrInvalidHexString: "ErrInvalidHexString", + time.ErrInvalidActorID: "ErrInvalidActorID", + types.ErrInvalidID: "ErrInvalidID", + clients.ErrInvalidClientID: "ErrInvalidClientID", + clients.ErrInvalidClientKey: "ErrInvalidClientKey", + key.ErrInvalidKey: "ErrInvalidKey", + types.ErrEmptyProjectFields: "ErrEmptyProjectFields", + packs.ErrClientSeqNotSequentialWithCheckpoint: "ErrClientSeqNotSequentialWithCheckpoint", + packs.ErrClientSeqInChangesAreNotSequential: "ErrClientSeqInChangesAreNotSequential", database.ErrProjectNotFound: "ErrProjectNotFound", database.ErrClientNotFound: "ErrClientNotFound", From 1917a7b71955c0a571f1c1f44a9cbfc77ba1c4ca Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sun, 11 Aug 2024 20:31:41 +0900 Subject: [PATCH 14/18] Write Test Code about validate ClientSeq is sequential with DocInfo.Checkpoint.ClientSeq --- server/packs/packs_test.go | 80 +++++++++++++++++++++++++++++++------- 1 file changed, 67 insertions(+), 13 deletions(-) diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 44afcabbc..24f626f5b 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -31,8 +31,12 @@ func Test(t *testing.T) { RunPushPullWithSequentialClientSeqTest(t) }) - t.Run("push/pull not sequential ClientSeq test", func(t *testing.T) { - RunPushPullWithNotSequentialClientSeqTest(t) + t.Run("push/pull not sequential ClientSeq with DocInfo.Checkpoint.ClientSeq test", func(t *testing.T) { + RunPushPullWithNotSequentialClientSeqWithCheckpoint(t) + }) + + t.Run("push/pull not sequential ClientSeq in changes test", func(t *testing.T) { + RunPushPullWithNotSequentialClientSeqInChangesTest(t) }) t.Run("push/pull ClientSeq less than ClientInfo's ClientSeq (duplicated request)", func(t *testing.T) { @@ -75,7 +79,7 @@ func RunPushPullWithSequentialClientSeqTest(t *testing.T) { assert.NoError(t, err) } -func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) { +func RunPushPullWithNotSequentialClientSeqInChangesTest(t *testing.T) { ctx := context.Background() be := setUpBackend(t) project, _ := be.DB.FindProjectInfoByID( @@ -87,7 +91,7 @@ func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) { actorID, _ := time.ActorIDFromHex(clientID) changePackWithNotSequentialClientSeq, _ := - createChangePackWithNotSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) + createChangePackWithNotSequentialClientSeqInChanges(helper.TestDocKey(t).String(), actorID.Bytes()) docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, changePackWithNotSequentialClientSeq.DocumentKey, true) @@ -103,7 +107,47 @@ func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) { Mode: types.SyncModePushPull, Status: document.StatusAttached, }) - assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequential), connecthelper.CodeOf(err)) + assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqInChangesAreNotSequential), connecthelper.CodeOf(err)) +} + +func RunPushPullWithNotSequentialClientSeqWithCheckpoint(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + + actorID, _ := time.ActorIDFromHex(clientID) + changePackFixture, _ := + createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackFixture.DocumentKey, true) + err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + + _, err = packs.PushPull(ctx, be, project.ToProject(), + clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + if err != nil { + assert.Fail(t, "failed to push pull") + } + + changePackWithNotSequentialClientSeqWithCheckpoint, _ := + createChangePackWithNotSequentialClientSeqWithCheckpoint(helper.TestDocKey(t).String(), actorID.Bytes()) + _, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithNotSequentialClientSeqWithCheckpoint, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequentialWithCheckpoint), connecthelper.CodeOf(err)) } func RunPushPullWithClientSeqLessThanClientInfoTest(t *testing.T) { @@ -189,23 +233,33 @@ func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) { func createChangePackWithSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, - Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2}, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 3}, Changes: []*api.Change{ - createChange(0, 0, actorID), createChange(1, 1, actorID), createChange(2, 2, actorID), + createChange(3, 3, actorID), }, }) } -func createChangePackWithNotSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) { +func createChangePackWithNotSequentialClientSeqInChanges(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, - Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0}, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 3}, Changes: []*api.Change{ - createChange(2, 2, actorID), createChange(1, 1, actorID), - createChange(0, 0, actorID), + createChange(3, 3, actorID), + createChange(2, 2, actorID), + }, + }) +} + +func createChangePackWithNotSequentialClientSeqWithCheckpoint(documentKey string, actorID []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 3, ClientSeq: 1e9}, + Changes: []*api.Change{ + createChange(1e9, 1e9, actorID), }, }) } @@ -213,7 +267,7 @@ func createChangePackWithNotSequentialClientSeq(documentKey string, actorID []by func createChangePackWithClientSeqLessThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, - Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 0}, + Checkpoint: &api.Checkpoint{ServerSeq: 3, ClientSeq: 3}, Changes: []*api.Change{ createChange(0, 0, actorID), }, @@ -227,7 +281,7 @@ func createChangePackFixture(documentKey string, actorID []byte) (*change.Pack, func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, - Checkpoint: &api.Checkpoint{ServerSeq: 1e9, ClientSeq: 2}, + Checkpoint: &api.Checkpoint{ServerSeq: 1e9, ClientSeq: 3}, }) } From 38cc7ed88126df1b2a0f63862176342e52204c3b Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sun, 11 Aug 2024 20:32:30 +0900 Subject: [PATCH 15/18] Fix validate ClientSeq Sequential With Checkpoint --- server/packs/packs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/packs/packs.go b/server/packs/packs.go index a672072c4..d32b2f977 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -304,7 +304,7 @@ func validateClientSeqSequentialWithCheckpoint(changes []*change.Change, checkpo expectedClientSeq := checkpoint.ClientSeq + 1 actualFirstClientSeq := changes[0].ClientSeq() - if expectedClientSeq != actualFirstClientSeq { + if expectedClientSeq < actualFirstClientSeq { return fmt.Errorf( "ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq (expected: %d, actual: %d) : %w", expectedClientSeq, From 4871d922d2e5ce295b47515814a897adbbce3243 Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sun, 11 Aug 2024 20:34:29 +0900 Subject: [PATCH 16/18] Remove TODO comment about needs of Changes validate --- server/packs/packs.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/packs/packs.go b/server/packs/packs.go index d32b2f977..7ce6fc5ef 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -84,8 +84,6 @@ func PushPull( be.Metrics.ObservePushPullResponseSeconds(gotime.Since(start).Seconds()) }() - // TODO: Changes may be reordered or missing during communication on the network. - // We should check the change.pack with checkpoint to make sure the changes are in the correct order. checkpoint := clientInfo.Checkpoint(docInfo.ID) err := validateClientSeqSequential(reqPack.Changes, checkpoint) if err != nil { From a32137e0bf11f3a826c43ae0da43a8754f593d8b Mon Sep 17 00:00:00 2001 From: binary_ho Date: Sun, 11 Aug 2024 21:40:05 +0900 Subject: [PATCH 17/18] Fix lint issues: some lines too long --- server/packs/packs.go | 3 ++- server/packs/packs_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/packs/packs.go b/server/packs/packs.go index 7ce6fc5ef..6e2a95aeb 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -39,7 +39,8 @@ import ( ) var ( - // ErrClientSeqNotSequentialWithCheckpoint is returned when ClientSeq in reqPack are not sequential With DocInfo.Checkpoint.ClientSeq + // ErrClientSeqNotSequentialWithCheckpoint is returned + // when ClientSeq in change not sequential With DocInfo.Checkpoint.ClientSeq ErrClientSeqNotSequentialWithCheckpoint = errors.New("ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq") // ErrClientSeqInChangesAreNotSequential is returned when ClientSeq in reqPack.Changes are not sequential diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go index 24f626f5b..2daaed0a3 100644 --- a/server/packs/packs_test.go +++ b/server/packs/packs_test.go @@ -254,7 +254,8 @@ func createChangePackWithNotSequentialClientSeqInChanges(documentKey string, act }) } -func createChangePackWithNotSequentialClientSeqWithCheckpoint(documentKey string, actorID []byte) (*change.Pack, error) { +func createChangePackWithNotSequentialClientSeqWithCheckpoint( + documentKey string, actorID []byte) (*change.Pack, error) { return converter.FromChangePack(&api.ChangePack{ DocumentKey: documentKey, Checkpoint: &api.Checkpoint{ServerSeq: 3, ClientSeq: 1e9}, From 0217951ba8fdcb1ec766c5b337dec0e9722da7de Mon Sep 17 00:00:00 2001 From: binary_ho Date: Mon, 12 Aug 2024 23:19:30 +0900 Subject: [PATCH 18/18] Change to return `CodeFailedPrecondition` Status Code to the client in the case of `ErrClientSeqNotSequentialWithCheckpoint`. --- server/rpc/connecthelper/status.go | 36 +++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/server/rpc/connecthelper/status.go b/server/rpc/connecthelper/status.go index e1ca7538c..93a63270f 100644 --- a/server/rpc/connecthelper/status.go +++ b/server/rpc/connecthelper/status.go @@ -39,17 +39,16 @@ import ( // errorToConnectCode maps an error to connectRPC status code. var errorToConnectCode = map[error]connect.Code{ // InvalidArgument means the request is malformed. - converter.ErrPackRequired: connect.CodeInvalidArgument, - converter.ErrCheckpointRequired: connect.CodeInvalidArgument, - time.ErrInvalidHexString: connect.CodeInvalidArgument, - time.ErrInvalidActorID: connect.CodeInvalidArgument, - types.ErrInvalidID: connect.CodeInvalidArgument, - clients.ErrInvalidClientID: connect.CodeInvalidArgument, - clients.ErrInvalidClientKey: connect.CodeInvalidArgument, - key.ErrInvalidKey: connect.CodeInvalidArgument, - types.ErrEmptyProjectFields: connect.CodeInvalidArgument, - packs.ErrClientSeqNotSequentialWithCheckpoint: connect.CodeInvalidArgument, - packs.ErrClientSeqInChangesAreNotSequential: connect.CodeInvalidArgument, + converter.ErrPackRequired: connect.CodeInvalidArgument, + converter.ErrCheckpointRequired: connect.CodeInvalidArgument, + time.ErrInvalidHexString: connect.CodeInvalidArgument, + time.ErrInvalidActorID: connect.CodeInvalidArgument, + types.ErrInvalidID: connect.CodeInvalidArgument, + clients.ErrInvalidClientID: connect.CodeInvalidArgument, + clients.ErrInvalidClientKey: connect.CodeInvalidArgument, + key.ErrInvalidKey: connect.CodeInvalidArgument, + types.ErrEmptyProjectFields: connect.CodeInvalidArgument, + packs.ErrClientSeqInChangesAreNotSequential: connect.CodeInvalidArgument, // NotFound means the requested resource does not exist. database.ErrProjectNotFound: connect.CodeNotFound, @@ -64,13 +63,14 @@ var errorToConnectCode = map[error]connect.Code{ // FailedPrecondition means the request is rejected because the state of the // system is not the desired state. - database.ErrClientNotActivated: connect.CodeFailedPrecondition, - database.ErrDocumentNotAttached: connect.CodeFailedPrecondition, - database.ErrDocumentAlreadyAttached: connect.CodeFailedPrecondition, - database.ErrDocumentAlreadyDetached: connect.CodeFailedPrecondition, - documents.ErrDocumentAttached: connect.CodeFailedPrecondition, - packs.ErrInvalidServerSeq: connect.CodeFailedPrecondition, - database.ErrConflictOnUpdate: connect.CodeFailedPrecondition, + database.ErrClientNotActivated: connect.CodeFailedPrecondition, + database.ErrDocumentNotAttached: connect.CodeFailedPrecondition, + database.ErrDocumentAlreadyAttached: connect.CodeFailedPrecondition, + database.ErrDocumentAlreadyDetached: connect.CodeFailedPrecondition, + documents.ErrDocumentAttached: connect.CodeFailedPrecondition, + packs.ErrInvalidServerSeq: connect.CodeFailedPrecondition, + database.ErrConflictOnUpdate: connect.CodeFailedPrecondition, + packs.ErrClientSeqNotSequentialWithCheckpoint: connect.CodeFailedPrecondition, // Unimplemented means the server does not implement the functionality. converter.ErrUnsupportedOperation: connect.CodeUnimplemented,