diff --git a/server/embed/etcd.go b/server/embed/etcd.go index a40ef662499..518215669cb 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -84,6 +84,7 @@ type Etcd struct { errc chan error closeOnce sync.Once + wg sync.WaitGroup } type peerListener struct { @@ -109,7 +110,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if !serving { // errored before starting gRPC server for serveCtx.serversC for _, sctx := range e.sctxs { - close(sctx.serversC) + sctx.close() } } e.Close() @@ -456,6 +457,7 @@ func (e *Etcd) Close() { } } if e.errc != nil { + e.wg.Wait() close(e.errc) } } @@ -870,6 +872,9 @@ func (e *Etcd) serveMetrics() (err error) { } func (e *Etcd) errHandler(err error) { + e.wg.Add(1) + defer e.wg.Done() + if err != nil { e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err)) } diff --git a/server/embed/serve.go b/server/embed/serve.go index a73ba58b1b5..c888e3b1cfb 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "strings" + "sync" gw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/soheilhy/cmux" @@ -66,6 +67,7 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) serversC chan *servers + closeOnce sync.Once } type servers struct { @@ -102,6 +104,9 @@ func (sctx *serveCtx) serve( ) (err error) { logger := defaultLog.New(io.Discard, "etcdhttp", 0) + // Make sure serversC is closed even if we prematurely exit the function. + defer sctx.close() + select { case <-s.StoppingNotify(): return errors.New("server is stopping") @@ -121,8 +126,6 @@ func (sctx *serveCtx) serve( servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) - // Make sure serversC is closed even if we prematurely exit the function. - defer close(sctx.serversC) var gwmux *gw.ServeMux if s.Cfg.EnableGRPCGateway { // GRPC gateway connects to grpc server via connection provided by grpc dial. @@ -521,3 +524,9 @@ func (sctx *serveCtx) registerTrace() { evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) } sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf)) } + +func (sctx *serveCtx) close() { + sctx.closeOnce.Do(func() { + close(sctx.serversC) + }) +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 778e786c3af..e6cee5b2650 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1776,6 +1776,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { ClientUrls: s.attributes.ClientURLs, }, } + // gofail: var beforePublishing struct{} lg := s.Logger() for { select { diff --git a/tests/integration/embed/embed_test.go b/tests/integration/embed/embed_test.go index 7a3d11f1cd4..242f2b6a7ca 100644 --- a/tests/integration/embed/embed_test.go +++ b/tests/integration/embed/embed_test.go @@ -219,3 +219,45 @@ func TestEmbedEtcdAutoCompactionRetentionRetained(t *testing.T) { assert.Equal(t, durationToCompare, autoCompactionRetention) e.Close() } + +func TestEmbedEtcdStopDuringBootstrapping(t *testing.T) { + integration2.BeforeTest(t, integration2.WithFailpoint("beforePublishing", `sleep("2s")`)) + + done := make(chan struct{}) + go func() { + defer close(done) + + cfg := embed.NewConfig() + urls := newEmbedURLs(false, 2) + setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]}) + cfg.Dir = filepath.Join(t.TempDir(), "embed-etcd") + + e, err := embed.StartEtcd(cfg) + if err != nil { + t.Errorf("Failed to start etcd, got error %v", err) + } + defer e.Close() + + go func() { + time.Sleep(time.Second) + e.Server.Stop() + t.Log("Stopped server during bootstrapping") + }() + + select { + case <-e.Server.ReadyNotify(): + t.Log("Server is ready!") + case <-e.Server.StopNotify(): + t.Log("Server is stopped") + case <-time.After(20 * time.Second): + e.Server.Stop() // trigger a shutdown + t.Error("Server took too long to start!") + } + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Error("timeout in bootstrapping etcd") + } +}