Skip to content

Commit

Permalink
Add Server.Ready() to ensure assets are up-to-date
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Oct 31, 2023
1 parent 5f39f14 commit 4b528fe
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 13 deletions.
10 changes: 2 additions & 8 deletions pkg/drivers/nats/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega
// TODO: limit the number of retries?
var retries int
for {
if ns.ReadyForConnections(5 * time.Second) {
if ns.Ready() {
logrus.Infof("embedded NATS server is ready for client connections")
break
}
retries++
logrus.Infof("waiting for embedded NATS server to be ready: %d", retries)
time.Sleep(100 * time.Millisecond)
}

// Use the local server's client URL.
Expand Down Expand Up @@ -173,13 +174,6 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega

logrus.Infof("bucket initialized: %s", config.bucket)

if ns != nil {
for !ns.JetStreamIsCurrent() || !ns.JetStreamIsStreamCurrent("$G", fmt.Sprintf("KV_%s", config.bucket)) {
logrus.Warnf("waiting for JetStream to be current")
time.Sleep(time.Second)
}
}

ekv := NewKeyValue(ctx, bucket, js)

// Reference the global logger, since it appears log levels are
Expand Down
5 changes: 1 addition & 4 deletions pkg/drivers/nats/server/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package server

import (
"net"
"time"
)

type Server interface {
Start()
Ready() bool
Shutdown()
ClientURL() string
ReadyForConnections(wait time.Duration) bool
JetStreamIsCurrent() bool
JetStreamIsStreamCurrent(account, stream string) bool
InProcessConn() (net.Conn, error)
}

Expand Down
52 changes: 51 additions & 1 deletion pkg/drivers/nats/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,65 @@
package server

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/nats-io/nats-server/v2/server"
"github.com/sirupsen/logrus"
)

const (
Embedded = true
)

type responseWriter struct {
code int
header http.Header
body *bytes.Buffer
}

func (w *responseWriter) Header() http.Header {
return w.header
}

func (w *responseWriter) Write(b []byte) (int, error) {
return w.body.Write(b)
}

func (w *responseWriter) WriteHeader(code int) {
w.code = code
}

type embeddedServer struct {
*server.Server
}

func (s *embeddedServer) Ready() bool {
rw := responseWriter{
header: http.Header{},
body: &bytes.Buffer{},
}

r := http.Request{
Method: "GET",
URL: &url.URL{
Path: "/healthz",
},
Header: http.Header{},
}

s.Server.HandleHealthz(&rw, &r)

var hs server.HealthStatus
json.NewDecoder(rw.body).Decode(&hs)
logrus.Debugf("embedded NATS server health: %#v", hs)

return hs.Status == "ok"
}

func New(c *Config) (Server, error) {
opts := &server.Options{}

Expand Down Expand Up @@ -52,5 +102,5 @@ func New(c *Config) (Server, error) {
srv.ConfigureLogger()
}

return srv, nil
return &embeddedServer{Server: srv}, nil
}

0 comments on commit 4b528fe

Please sign in to comment.