diff --git a/pkg/apiserver/server.go b/pkg/apiserver/server.go index 962f40ef5f..93f507612b 100644 --- a/pkg/apiserver/server.go +++ b/pkg/apiserver/server.go @@ -165,6 +165,15 @@ func Start(params *APIServerParams) { handlers.RegisterUnauthenticatedRoutes(handler, kotsStore, debugRouter, loggingRouter) + /********************************************************************** + * Websocket routes (only for embedded cluster) + **********************************************************************/ + + if util.IsEmbeddedCluster() { + wsRouter := r.NewRoute().Subrouter() + wsRouter.HandleFunc("/ec-ws", handler.ConnectToECWebsocket) + } + /********************************************************************** * KOTS token auth routes **********************************************************************/ diff --git a/pkg/handlers/debug.go b/pkg/handlers/debug.go new file mode 100644 index 0000000000..add9c1eb36 --- /dev/null +++ b/pkg/handlers/debug.go @@ -0,0 +1,20 @@ +package handlers + +import ( + "net/http" + + "github.com/replicatedhq/kots/pkg/websocket" + websockettypes "github.com/replicatedhq/kots/pkg/websocket/types" +) + +type DebugInfoResponse struct { + WSClients map[string]websockettypes.WSClient `json:"wsClients"` +} + +func (h *Handler) GetDebugInfo(w http.ResponseWriter, r *http.Request) { + response := DebugInfoResponse{ + WSClients: websocket.GetClients(), + } + + JSON(w, http.StatusOK, response) +} diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index fa2c7fb08c..6d1e190c59 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -331,6 +331,10 @@ func RegisterSessionAuthRoutes(r *mux.Router, kotsStore store.Store, handler KOT r.Name("ChangePassword").Path("/api/v1/password/change").Methods("PUT"). HandlerFunc(middleware.EnforceAccess(policy.PasswordChange, handler.ChangePassword)) + // Debug info + r.Name("GetDebugInfo").Path("/api/v1/debug").Methods("GET"). + HandlerFunc(middleware.EnforceAccess(policy.ClusterRead, handler.GetDebugInfo)) + // Upgrade service r.Name("StartUpgradeService").Path("/api/v1/app/{appSlug}/start-upgrade-service").Methods("POST"). HandlerFunc(middleware.EnforceAccess(policy.AppUpdate, handler.StartUpgradeService)) diff --git a/pkg/handlers/handlers_test.go b/pkg/handlers/handlers_test.go index 384330f919..c569dceebc 100644 --- a/pkg/handlers/handlers_test.go +++ b/pkg/handlers/handlers_test.go @@ -1429,6 +1429,16 @@ var HandlerPolicyTests = map[string][]HandlerPolicyTest{ ExpectStatus: http.StatusOK, }, }, + "GetDebugInfo": { + { + Roles: []rbactypes.Role{rbac.ClusterAdminRole}, + SessionRoles: []string{rbac.ClusterAdminRoleID}, + Calls: func(storeRecorder *mock_store.MockStoreMockRecorder, handlerRecorder *mock_handlers.MockKOTSHandlerMockRecorder) { + handlerRecorder.GetDebugInfo(gomock.Any(), gomock.Any()) + }, + ExpectStatus: http.StatusOK, + }, + }, // Upgrade Service "StartUpgradeService": { diff --git a/pkg/handlers/interface.go b/pkg/handlers/interface.go index fd5f36e779..e4e8085f88 100644 --- a/pkg/handlers/interface.go +++ b/pkg/handlers/interface.go @@ -164,7 +164,13 @@ type KOTSHandler interface { // Password change ChangePassword(w http.ResponseWriter, r *http.Request) + // Debug info + GetDebugInfo(w http.ResponseWriter, r *http.Request) + // Upgrade service StartUpgradeService(w http.ResponseWriter, r *http.Request) GetUpgradeServiceStatus(w http.ResponseWriter, r *http.Request) + + // EC Websocket + ConnectToECWebsocket(w http.ResponseWriter, r *http.Request) } diff --git a/pkg/handlers/mock/mock.go b/pkg/handlers/mock/mock.go index 7926488c29..e654f6696a 100644 --- a/pkg/handlers/mock/mock.go +++ b/pkg/handlers/mock/mock.go @@ -178,6 +178,18 @@ func (mr *MockKOTSHandlerMockRecorder) ConfirmEmbeddedClusterManagement(w, r int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConfirmEmbeddedClusterManagement", reflect.TypeOf((*MockKOTSHandler)(nil).ConfirmEmbeddedClusterManagement), w, r) } +// ConnectToECWebsocket mocks base method. +func (m *MockKOTSHandler) ConnectToECWebsocket(w http.ResponseWriter, r *http.Request) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ConnectToECWebsocket", w, r) +} + +// ConnectToECWebsocket indicates an expected call of ConnectToECWebsocket. +func (mr *MockKOTSHandlerMockRecorder) ConnectToECWebsocket(w, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectToECWebsocket", reflect.TypeOf((*MockKOTSHandler)(nil).ConnectToECWebsocket), w, r) +} + // CreateAppFromAirgap mocks base method. func (m *MockKOTSHandler) CreateAppFromAirgap(w http.ResponseWriter, r *http.Request) { m.ctrl.T.Helper() @@ -694,6 +706,18 @@ func (mr *MockKOTSHandlerMockRecorder) GetBackup(w, r interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBackup", reflect.TypeOf((*MockKOTSHandler)(nil).GetBackup), w, r) } +// GetDebugInfo mocks base method. +func (m *MockKOTSHandler) GetDebugInfo(w http.ResponseWriter, r *http.Request) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "GetDebugInfo", w, r) +} + +// GetDebugInfo indicates an expected call of GetDebugInfo. +func (mr *MockKOTSHandlerMockRecorder) GetDebugInfo(w, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDebugInfo", reflect.TypeOf((*MockKOTSHandler)(nil).GetDebugInfo), w, r) +} + // GetDownstreamOutput mocks base method. func (m *MockKOTSHandler) GetDownstreamOutput(w http.ResponseWriter, r *http.Request) { m.ctrl.T.Helper() diff --git a/pkg/handlers/websocket.go b/pkg/handlers/websocket.go new file mode 100644 index 0000000000..7e3180350d --- /dev/null +++ b/pkg/handlers/websocket.go @@ -0,0 +1,32 @@ +package handlers + +import ( + "net/http" + + "github.com/pkg/errors" + "github.com/replicatedhq/kots/pkg/logger" + "github.com/replicatedhq/kots/pkg/websocket" +) + +type ConnectToECWebsocketResponse struct { + Error string `json:"error,omitempty"` +} + +func (h *Handler) ConnectToECWebsocket(w http.ResponseWriter, r *http.Request) { + response := ConnectToECWebsocketResponse{} + + nodeName := r.URL.Query().Get("nodeName") + if nodeName == "" { + response.Error = "missing node name" + logger.Error(errors.New(response.Error)) + JSON(w, http.StatusBadRequest, response) + return + } + + if err := websocket.Connect(w, r, nodeName); err != nil { + response.Error = "failed to establish websocket connection" + logger.Error(errors.Wrap(err, response.Error)) + JSON(w, http.StatusInternalServerError, response) + return + } +} diff --git a/pkg/websocket/types/types.go b/pkg/websocket/types/types.go new file mode 100644 index 0000000000..f2d9a1c469 --- /dev/null +++ b/pkg/websocket/types/types.go @@ -0,0 +1,21 @@ +package types + +import ( + "time" + + "github.com/gorilla/websocket" +) + +type WSClient struct { + Conn *websocket.Conn `json:"-"` + ConnectedAt time.Time `json:"connectedAt"` + LastPingSent PingPongInfo `json:"lastPingSent"` + LastPongRecv PingPongInfo `json:"lastPongRecv"` + LastPingRecv PingPongInfo `json:"lastPingRecv"` + LastPongSent PingPongInfo `json:"lastPongSent"` +} + +type PingPongInfo struct { + Time time.Time `json:"time"` + Message string `json:"message"` +} diff --git a/pkg/websocket/websocket.go b/pkg/websocket/websocket.go new file mode 100644 index 0000000000..e9691d631c --- /dev/null +++ b/pkg/websocket/websocket.go @@ -0,0 +1,185 @@ +package websocket + +import ( + "fmt" + "math/rand" + "net" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/replicatedhq/kots/pkg/logger" + "github.com/replicatedhq/kots/pkg/websocket/types" +) + +var wsUpgrader = websocket.Upgrader{} +var wsClients = make(map[string]types.WSClient) +var wsMutex = sync.Mutex{} + +func Connect(w http.ResponseWriter, r *http.Request, nodeName string) error { + conn, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + return errors.Wrap(err, "failed to upgrade to websocket") + } + defer conn.Close() + + conn.SetPingHandler(wsPingHandler(nodeName, conn)) + conn.SetPongHandler(wsPongHandler(nodeName)) + conn.SetCloseHandler(wsCloseHandler(nodeName, conn)) + + // register the client + registerWSClient(nodeName, conn) + + // ping client on a regular interval to make sure it's still connected + go pingWSClient(nodeName, conn) + + // listen to client messages + listenToWSClient(nodeName, conn) + return nil +} + +func pingWSClient(nodeName string, conn *websocket.Conn) { + for { + sleepDuration := time.Second * time.Duration(5+rand.Intn(16)) // 5-20 seconds + time.Sleep(sleepDuration) + + pingMsg := fmt.Sprintf("%x", rand.Int()) + + if err := conn.WriteControl(websocket.PingMessage, []byte(pingMsg), time.Now().Add(1*time.Second)); err != nil { + if isWSConnClosed(nodeName, err) { + removeWSClient(nodeName, err) + return + } + logger.Debugf("Failed to send ping message to %s: %v", nodeName, err) + continue + } + + wsMutex.Lock() + client := wsClients[nodeName] + wsMutex.Unlock() + + client.LastPingSent = types.PingPongInfo{ + Time: time.Now(), + Message: pingMsg, + } + wsClients[nodeName] = client + } +} + +func listenToWSClient(nodeName string, conn *websocket.Conn) { + for { + _, _, err := conn.ReadMessage() // this is required to receive ping/pong messages + if err != nil { + if isWSConnClosed(nodeName, err) { + removeWSClient(nodeName, err) + return + } + logger.Debugf("Error reading websocket message from %s: %v", nodeName, err) + } + } +} + +func registerWSClient(nodeName string, conn *websocket.Conn) { + wsMutex.Lock() + defer wsMutex.Unlock() + + if e, ok := wsClients[nodeName]; ok { + e.Conn.Close() + delete(wsClients, nodeName) + } + + wsClients[nodeName] = types.WSClient{ + Conn: conn, + ConnectedAt: time.Now(), + } + + logger.Infof("Registered new websocket for %s", nodeName) +} + +func removeWSClient(nodeName string, err error) { + wsMutex.Lock() + defer wsMutex.Unlock() + + if _, ok := wsClients[nodeName]; !ok { + return + } + logger.Infof("Websocket connection closed for %s: %v", nodeName, err) + delete(wsClients, nodeName) +} + +func wsPingHandler(nodeName string, conn *websocket.Conn) func(message string) error { + return func(message string) error { + wsMutex.Lock() + defer wsMutex.Unlock() + + client := wsClients[nodeName] + client.LastPingRecv = types.PingPongInfo{ + Time: time.Now(), + Message: message, + } + + if err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(1*time.Second)); err != nil { + logger.Debugf("Failed to send pong message to %s: %v", nodeName, err) + } else { + client.LastPongSent = types.PingPongInfo{ + Time: time.Now(), + Message: message, + } + } + + wsClients[nodeName] = client + return nil + } +} + +func wsPongHandler(nodeName string) func(message string) error { + return func(message string) error { + wsMutex.Lock() + defer wsMutex.Unlock() + + client := wsClients[nodeName] + client.LastPongRecv = types.PingPongInfo{ + Time: time.Now(), + Message: message, + } + wsClients[nodeName] = client + + return nil + } +} + +func wsCloseHandler(nodeName string, conn *websocket.Conn) func(code int, text string) error { + return func(code int, text string) error { + logger.Infof("Websocket connection closed for %s: %d (exit code), message: %q", nodeName, code, text) + + wsMutex.Lock() + delete(wsClients, nodeName) + wsMutex.Unlock() + + message := websocket.FormatCloseMessage(code, text) + conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) + return nil + } +} + +func isWSConnClosed(nodeName string, err error) bool { + wsMutex.Lock() + defer wsMutex.Unlock() + + if _, ok := wsClients[nodeName]; !ok { + return true + } + if _, ok := err.(*websocket.CloseError); ok { + return true + } + if e, ok := err.(*net.OpError); ok && !e.Temporary() { + return true + } + return false +} + +func GetClients() map[string]types.WSClient { + return wsClients +} diff --git a/web/src/Root.tsx b/web/src/Root.tsx index b1c4c3aab6..d2d23d6484 100644 --- a/web/src/Root.tsx +++ b/web/src/Root.tsx @@ -31,6 +31,7 @@ import AppLicense from "@components/apps/AppLicense"; import AppRegistrySettings from "@components/apps/AppRegistrySettings"; import AppIdentityServiceSettings from "@components/apps/AppIdentityServiceSettings"; import TroubleshootContainer from "@components/troubleshoot/TroubleshootContainer"; +import DebugInfo from "@components/DebugInfo"; import Footer from "./components/shared/Footer"; import NavBar from "./components/shared/NavBar"; @@ -749,6 +750,7 @@ const Root = () => { /> } />{" "} } /> + } /> { + const [debugData, setDebugData] = useState(null); + const [loading, setLoading] = useState(true); + + useEffect(() => { + fetchDebugInfo(); + const intervalId = setInterval(fetchDebugInfo, 10000); + return () => clearInterval(intervalId); + }, []); + + async function fetchDebugInfo() { + try { + const response = await fetch(`${process.env.API_ENDPOINT}/debug`, { + headers: { + "Content-Type": "application/json", + }, + credentials: "include", + }); + const data = await response.json(); + setDebugData(data); + } catch (error) { + console.error("Error fetching debug info:", error); + } finally { + setLoading(false); + } + } + + function isStale(clientInfo) { + const timestamps = [ + clientInfo.lastPingSent.time, + clientInfo.lastPongRecv.time, + clientInfo.lastPongRecv.time, + clientInfo.lastPingRecv.time, + ]; + return timestamps.some( + (timestamp) => new Date() - new Date(timestamp) > 60 * 1000 + ); + } + + if (loading) { + return
Loading debug information...
; + } + + if (!debugData?.wsClients) { + return
No debug information available.
; + } + + return ( +
+

+ Websocket Clients +

+
+ {Object.entries(debugData.wsClients).map(([nodeName, clientInfo]) => ( +
+

+ + {nodeName} +

+
+

+ Connected At:{" "} + {Utilities.dateFormat( + clientInfo.connectedAt, + "MM/DD/YY @ hh:mm a z" + )} +

+
+
+

+ Last Ping Sent:{" "} + {Utilities.dateFormat( + clientInfo.lastPingSent.time, + "MM/DD/YY @ hh:mm:ss a z" + )}{" "} + (Message:{" "} + {clientInfo.lastPingSent.message || "N/A"}) +

+
+
+

+ Last Pong Received:{" "} + {Utilities.dateFormat( + clientInfo.lastPongRecv.time, + "MM/DD/YY @ hh:mm:ss a z" + )}{" "} + (Message:{" "} + {clientInfo.lastPongRecv.message || "N/A"}) +

+
+
+

+ Last Ping Received:{" "} + {Utilities.dateFormat( + clientInfo.lastPingRecv.time, + "MM/DD/YY @ hh:mm:ss a z" + )}{" "} + (Message:{" "} + {clientInfo.lastPingRecv.message || "N/A"}) +

+
+
+

+ Last Pong Sent:{" "} + {Utilities.dateFormat( + clientInfo.lastPongSent.time, + "MM/DD/YY @ hh:mm:ss a z" + )}{" "} + (Message:{" "} + {clientInfo.lastPongSent.message || "N/A"}) +

+
+
+ ))} +
+
+ ); +}; + +export default DebugInfo; diff --git a/web/src/scss/components/DebugInfo.scss b/web/src/scss/components/DebugInfo.scss new file mode 100644 index 0000000000..627802edf7 --- /dev/null +++ b/web/src/scss/components/DebugInfo.scss @@ -0,0 +1,15 @@ +@import "../variables.scss"; + +.node-status { + width: 8px; + height: 8px; + border-radius: 100%; + margin-right: 4px; + display: inline-block; + vertical-align: 1px; + background-color: #44bb66; + + &.disconnected { + background-color: #bc4752; + } +}