From 61e2382d70109289554c8dba4bc0f36c61066432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=80=86=E6=B5=81=E8=80=8C=E4=B8=8A?= <1666888816@qq.com> Date: Sun, 18 Feb 2024 18:15:51 +0800 Subject: [PATCH] perf: remove mq etcd dependency, export log api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 逆流而上 <1666888816@qq.com> --- api/api/api_interface.go | 1 + api/api_routers/version2/v2Routers.go | 1 + api/controller/pods.go | 63 +++++++- api/server/api.go | 12 +- cmd/api/main.go | 3 +- cmd/mq/mq.go | 23 ++- cmd/mq/server/server.go | 84 ----------- config/configs/config.go | 8 +- mq/api/api.go | 198 -------------------------- mq/api/mq/mq.go | 49 ++----- mq/api/mq/store.go | 81 +++++++++++ mq/mqcomponent/grpcserver/server.go | 54 +++++++ mq/mqcomponent/metrics/server.go | 53 +++++++ mq/mqcomponent/mqclient/client.go | 33 +++++ node/api/router/router.go | 4 +- pkg/component/core.go | 15 ++ util/etcd/queue.go | 94 ------------ util/etcd/watch.go | 78 ---------- 18 files changed, 340 insertions(+), 514 deletions(-) delete mode 100644 cmd/mq/server/server.go delete mode 100644 mq/api/api.go create mode 100644 mq/api/mq/store.go create mode 100644 mq/mqcomponent/grpcserver/server.go create mode 100644 mq/mqcomponent/metrics/server.go create mode 100644 mq/mqcomponent/mqclient/client.go delete mode 100644 util/etcd/queue.go delete mode 100644 util/etcd/watch.go diff --git a/api/api/api_interface.go b/api/api/api_interface.go index 5fcc8ac437..f4b903b3fb 100644 --- a/api/api/api_interface.go +++ b/api/api/api_interface.go @@ -281,6 +281,7 @@ type AppRestoreInterface interface { // PodInterface defines api methods about k8s pods. type PodInterface interface { PodDetail(w http.ResponseWriter, r *http.Request) + PodLogs(w http.ResponseWriter, r *http.Request) } // RegistryAuthSecretInterface registry auth secret interface diff --git a/api/api_routers/version2/v2Routers.go b/api/api_routers/version2/v2Routers.go index 968f77131c..659d1195d9 100644 --- a/api/api_routers/version2/v2Routers.go +++ b/api/api_routers/version2/v2Routers.go @@ -64,6 +64,7 @@ func (v2 *V2) Routes() chi.Router { r.Mount("/monitor", v2.monitorRouter()) r.Mount("/helm", v2.helmRouter()) r.Mount("/proxy-pass", v2.proxyRoute()) + r.Get("/pods/logs", controller.GetManager().PodLogs) return r } diff --git a/api/controller/pods.go b/api/controller/pods.go index 498aa4b42e..4cf1332910 100644 --- a/api/controller/pods.go +++ b/api/controller/pods.go @@ -19,18 +19,21 @@ package controller import ( + "bufio" "fmt" - "net/http" - "strings" - "github.com/go-chi/chi" "github.com/goodrain/rainbond/api/handler" ctxutil "github.com/goodrain/rainbond/api/util/ctx" "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db/model" + "github.com/goodrain/rainbond/pkg/component/k8s" httputil "github.com/goodrain/rainbond/util/http" "github.com/goodrain/rainbond/worker/server" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "net/http" + "strings" + "time" ) // PodController is an implementation of PodInterface @@ -114,3 +117,57 @@ func (p *PodController) PodDetail(w http.ResponseWriter, r *http.Request) { } httputil.ReturnSuccess(r, w, pd) } + +// PodLogs - +func (p *PodController) PodLogs(w http.ResponseWriter, r *http.Request) { + podNamespace := r.URL.Query().Get("pod_ns") + podName := r.URL.Query().Get("pod_name") + // Get Kubernetes pod logs + lines := int64(1280) + req := k8s.Default().Clientset.CoreV1().Pods(podNamespace).GetLogs(podName, &corev1.PodLogOptions{ + Follow: true, + Timestamps: true, + TailLines: &lines, + }) + stream, err := req.Stream(r.Context()) + if err != nil { + logrus.Errorf("Error opening log stream: %v", err) + http.Error(w, "Error opening log stream", http.StatusInternalServerError) + return + } + defer stream.Close() + // Use Flusher to send headers to the client + flusher, ok := w.(http.Flusher) + if !ok { + logrus.Errorf("Streaming not supported") + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + // Set headers for SSE + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + scanner := bufio.NewScanner(stream) + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + + var messages []string + + for { + select { + case <-ticker.C: + fmt.Fprintf(w, strings.Join(messages, "\n\n")) + messages = messages[:0] // Clear the slice + flusher.Flush() + case <-r.Context().Done(): + logrus.Warningf("Request context done: %v", r.Context().Err()) + return + default: + if len(messages) < 128 && scanner.Scan() { + messages = append(messages, "data: "+scanner.Text()) + } + } + } +} diff --git a/api/server/api.go b/api/server/api.go index 4825a4e507..0749bc911a 100644 --- a/api/server/api.go +++ b/api/server/api.go @@ -23,18 +23,16 @@ import ( "crypto/tls" "crypto/x509" "github.com/coreos/etcd/clientv3" + "github.com/goodrain/rainbond/api/handler" "github.com/goodrain/rainbond/pkg/interceptors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/version" "io/ioutil" "log" "net/http" "os" "strings" - "time" - - "github.com/goodrain/rainbond/api/handler" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/prometheus/common/version" "github.com/goodrain/rainbond/util" @@ -108,7 +106,7 @@ func (m *Manager) SetMiddleware() { //Gracefully absorb panics and prints the stack trace r.Use(interceptors.Recoverer) //request time out - r.Use(middleware.Timeout(time.Second * 5)) + //r.Use(middleware.Timeout(time.Second * 5)) //simple authz if os.Getenv("TOKEN") != "" { r.Use(apimiddleware.FullToken) diff --git a/cmd/api/main.go b/cmd/api/main.go index ec976a9713..396c7184d8 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -47,7 +47,8 @@ func main() { APIConfig: s.Config, }) // 启动 rbd-api - err := rainbond.New(context.Background(), configs.Default()).Registry(component.Database()). + err := rainbond.New(context.Background(), configs.Default()). + Registry(component.Database()). Registry(component.Grpc()). Registry(component.Event()). Registry(component.K8sClient()). diff --git a/cmd/mq/mq.go b/cmd/mq/mq.go index 2f7d55b319..b099c26b93 100644 --- a/cmd/mq/mq.go +++ b/cmd/mq/mq.go @@ -19,13 +19,15 @@ package main import ( - "fmt" + "context" + "github.com/goodrain/rainbond/config/configs" + "github.com/goodrain/rainbond/pkg/component" + "github.com/goodrain/rainbond/pkg/rainbond" + "github.com/sirupsen/logrus" "os" "github.com/goodrain/rainbond/cmd" "github.com/goodrain/rainbond/cmd/mq/option" - "github.com/goodrain/rainbond/cmd/mq/server" - "github.com/spf13/pflag" ) @@ -37,8 +39,17 @@ func main() { s.AddFlags(pflag.CommandLine) pflag.Parse() s.SetLog() - if err := server.Run(s); err != nil { - fmt.Fprintf(os.Stderr, "error: %v\n", err) - os.Exit(1) + + configs.SetDefault(&configs.Config{ + AppName: "rbd-mq", + MQConfig: s.Config, + }) + err := rainbond.New(context.Background(), configs.Default()). + Registry(component.MQClient()). + RegistryCancel(component.MQGrpcServer()). + RegistryCancel(component.MQHealthServer()). + Start() + if err != nil { + logrus.Errorf("start rbd mq error %s", err.Error()) } } diff --git a/cmd/mq/server/server.go b/cmd/mq/server/server.go deleted file mode 100644 index 04d9abf07d..0000000000 --- a/cmd/mq/server/server.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package server - -import ( - "os" - "os/signal" - "syscall" - - "github.com/goodrain/rainbond/cmd/mq/option" - discover "github.com/goodrain/rainbond/discover.v2" - "github.com/goodrain/rainbond/mq/api" - - etcdutil "github.com/goodrain/rainbond/util/etcd" - "github.com/sirupsen/logrus" -) - -//Run start run -func Run(s *option.MQServer) error { - errChan := make(chan error) - - //step 1:start mq api manager - apiManager, err := api.NewManager(s.Config) - if err != nil { - return err - } - apiManager.Start(errChan) - defer apiManager.Stop() - - etcdClientArgs := &etcdutil.ClientArgs{ - Endpoints: s.Config.EtcdEndPoints, - CaFile: s.Config.EtcdCaFile, - CertFile: s.Config.EtcdCertFile, - KeyFile: s.Config.EtcdKeyFile, - } - - //step 2:regist mq endpoint - keepalive, err := discover.CreateKeepAlive(etcdClientArgs, "rainbond_mq", s.Config.HostName, s.Config.HostIP, s.Config.APIPort) - if err != nil { - return err - } - if err := keepalive.Start(); err != nil { - return err - } - defer keepalive.Stop() - - //step 3:regist prometheus export endpoint - exportKeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "mq", s.Config.HostName, s.Config.HostIP, 6301) - if err != nil { - return err - } - if err := exportKeepalive.Start(); err != nil { - return err - } - defer exportKeepalive.Stop() - - //step finally: listen Signal - term := make(chan os.Signal) - signal.Notify(term, os.Interrupt, syscall.SIGTERM) - select { - case <-term: - logrus.Warn("Received SIGTERM, exiting gracefully...") - case err := <-errChan: - logrus.Errorf("Received a error %s, exiting gracefully...", err.Error()) - } - logrus.Info("See you next time!") - return nil -} diff --git a/config/configs/config.go b/config/configs/config.go index 03272fd7e8..59379a855e 100644 --- a/config/configs/config.go +++ b/config/configs/config.go @@ -1,6 +1,9 @@ package configs -import "github.com/goodrain/rainbond/cmd/api/option" +import ( + apiconfig "github.com/goodrain/rainbond/cmd/api/option" + mqconfig "github.com/goodrain/rainbond/cmd/mq/option" +) // Env - type Env string @@ -11,7 +14,8 @@ type Config struct { Version string Env Env Debug bool - APIConfig option.Config + APIConfig apiconfig.Config + MQConfig mqconfig.Config } var defaultConfig *Config diff --git a/mq/api/api.go b/mq/api/api.go deleted file mode 100644 index 32006a6864..0000000000 --- a/mq/api/api.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package api - -import ( - "fmt" - "net" - "net/http" - "time" - - "github.com/goodrain/rainbond/cmd/mq/option" - "github.com/goodrain/rainbond/mq/api/controller" - "github.com/goodrain/rainbond/mq/api/mq" - "github.com/goodrain/rainbond/mq/monitor" - - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - - "golang.org/x/net/context" - - _ "net/http/pprof" - - restful "github.com/emicklei/go-restful" - swagger "github.com/emicklei/go-restful-swagger12" - grpcserver "github.com/goodrain/rainbond/mq/api/grpc/server" - httputil "github.com/goodrain/rainbond/util/http" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/prometheus/common/version" - "github.com/sirupsen/logrus" -) - -type Manager struct { - container *restful.Container - ctx context.Context - cancel context.CancelFunc - conf option.Config - server Server - actionMQ mq.ActionMQ -} -type Server interface { - Server() error - Close() error -} - -type httpServer struct { - server *http.Server -} - -func (h *httpServer) Server() error { - if err := h.server.ListenAndServe(); err != nil { - logrus.Error("mq api http listen error.", err.Error()) - return err - } - return nil -} -func (h *httpServer) Close() error { - if h.server != nil { - ctx, _ := context.WithTimeout(context.Background(), time.Second*5) - return h.server.Shutdown(ctx) - } - return nil -} - -type grpcServer struct { - server *grpc.Server - lis net.Listener -} - -func (h *grpcServer) Server() error { - if err := h.server.Serve(h.lis); err != nil { - logrus.Error("mq api grpc listen error.", err.Error()) - return err - } - return nil -} -func (h *grpcServer) Close() error { - return h.lis.Close() -} - -//NewManager api manager -func NewManager(c option.Config) (*Manager, error) { - ctx, cancel := context.WithCancel(context.Background()) - actionMQ := mq.NewActionMQ(ctx, c) - manager := &Manager{ - ctx: ctx, - cancel: cancel, - conf: c, - actionMQ: actionMQ, - } - go func() { - manager.Prometheus() - health() - if err := http.ListenAndServe(":6301", nil); err != nil { - logrus.Error("mq pprof listen error.", err.Error()) - } - }() - if c.RunMode == "http" { - wsContainer := restful.NewContainer() - server := &http.Server{Addr: fmt.Sprintf(":%d", c.APIPort), Handler: wsContainer} - controller.Register(wsContainer, actionMQ) - manager.container = wsContainer - manager.server = &httpServer{server} - manager.doc() - logrus.Info("mq server api run with http") - } else { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", c.APIPort)) - if err != nil { - logrus.Errorf("failed to listen: %v", err) - return nil, err - } - s := grpc.NewServer() - grpcserver.RegisterServer(s, actionMQ) - // Register reflection service on gRPC server. - reflection.Register(s) - manager.server = &grpcServer{ - server: s, - lis: lis, - } - logrus.Info("mq server api run with gRPC") - } - - return manager, nil -} - -//Start 启动 -func (m *Manager) Start(errChan chan error) { - logrus.Infof("api server start listening on 0.0.0.0:%d", m.conf.APIPort) - err := m.actionMQ.Start() - if err != nil { - errChan <- err - } - go func() { - if err := m.server.Server(); err != nil { - logrus.Error("mq api listen error.", err.Error()) - errChan <- err - } - }() -} - -func (m *Manager) doc() { - // Optionally, you can install the Swagger Service which provides a nice Web UI on your REST API - // You need to download the Swagger HTML5 assets and change the FilePath location in the config below. - // Open http://localhost:8080/apidocs and enter http://localhost:8080/swagger.json in the api input field. - config := swagger.Config{ - WebServices: m.container.RegisteredWebServices(), // you control what services are visible - ApiPath: "/swagger.json", - - // Optionally, specify where the UI is located - SwaggerPath: "/apidocs/", - Info: swagger.Info{ - Title: "goodrain mq api doc.", - }, - ApiVersion: "1.0", - SwaggerFilePath: "./dist"} - swagger.RegisterSwaggerService(config, m.container) - -} - -//Stop 停止 -func (m *Manager) Stop() error { - logrus.Info("api server is stoping.") - m.cancel() - //m.server.Close() - return m.actionMQ.Stop() -} - -//Prometheus prometheus init -func (m *Manager) Prometheus() { - prometheus.MustRegister(version.NewCollector("acp_mq")) - exporter := monitor.NewExporter(m.actionMQ) - prometheus.MustRegister(exporter) - http.Handle("/metrics", promhttp.Handler()) -} - -func health() { - http.HandleFunc("/health", checkHalth) -} - -func checkHalth(w http.ResponseWriter, r *http.Request) { - httputil.ReturnSuccess(r, w, map[string]string{"status": "health", "info": "mq service health"}) -} diff --git a/mq/api/mq/mq.go b/mq/api/mq/mq.go index 11e6f8b815..3716aca5b5 100644 --- a/mq/api/mq/mq.go +++ b/mq/api/mq/mq.go @@ -19,19 +19,14 @@ package mq import ( - "github.com/coreos/etcd/clientv3" + "github.com/goodrain/rainbond/cmd/mq/option" + "github.com/goodrain/rainbond/mq/client" "os" "strings" "sync" - "time" - - "github.com/goodrain/rainbond/cmd/mq/option" - "github.com/goodrain/rainbond/mq/client" "golang.org/x/net/context" - etcdutil "github.com/goodrain/rainbond/util/etcd" - "github.com/sirupsen/logrus" ) @@ -63,28 +58,17 @@ func NewActionMQ(ctx context.Context, c option.Config) ActionMQ { } type etcdQueue struct { + client *KeyValueStore config option.Config ctx context.Context queues map[string]string queuesLock sync.Mutex - client *clientv3.Client } func (e *etcdQueue) Start() error { logrus.Debug("etcd message queue client starting") - etcdClientArgs := &etcdutil.ClientArgs{ - Endpoints: e.config.EtcdEndPoints, - CaFile: e.config.EtcdCaFile, - CertFile: e.config.EtcdCertFile, - KeyFile: e.config.EtcdKeyFile, - DialTimeout: time.Duration(e.config.EtcdTimeout) * time.Second, - } - cli, err := etcdutil.NewClient(context.Background(), etcdClientArgs) - if err != nil { - etcdutil.HandleEtcdError(err) - return err - } - e.client = cli + + e.client = NewKeyValueStore() topics := os.Getenv("topics") if topics != "" { ts := strings.Split(topics, ",") @@ -121,9 +105,7 @@ func (e *etcdQueue) GetAllTopics() []string { } func (e *etcdQueue) Stop() error { - if e.client != nil { - e.client.Close() - } + return nil } func (e *etcdQueue) queueKey(topic string) string { @@ -131,25 +113,16 @@ func (e *etcdQueue) queueKey(topic string) string { } func (e *etcdQueue) Enqueue(ctx context.Context, topic, value string) error { EnqueueNumber++ - queue := etcdutil.NewQueue(ctx, e.client, e.queueKey(topic)) - return queue.Enqueue(value) + e.client.Put(e.queueKey(topic), value) + return nil } func (e *etcdQueue) Dequeue(ctx context.Context, topic string) (string, error) { DequeueNumber++ - queue := etcdutil.NewQueue(ctx, e.client, e.queueKey(topic)) - return queue.Dequeue() + res, _ := e.client.Get(e.queueKey(topic)) + return res, nil } func (e *etcdQueue) MessageQueueSize(topic string) int64 { - ctx, cancel := context.WithCancel(e.ctx) - defer cancel() - res, err := e.client.Get(ctx, e.queueKey(topic), clientv3.WithPrefix()) - if err != nil { - logrus.Errorf("get message queue size failure %s", err.Error()) - } - if res != nil { - return res.Count - } - return 0 + return e.client.Size(topic) } diff --git a/mq/api/mq/store.go b/mq/api/mq/store.go new file mode 100644 index 0000000000..46f5ce36f5 --- /dev/null +++ b/mq/api/mq/store.go @@ -0,0 +1,81 @@ +package mq + +import ( + "sync" + "time" +) + +// KeyValueStore 是一个简单的键值存储结构,支持多值 +type KeyValueStore struct { + data map[string][]string + mu sync.Mutex + cond *sync.Cond +} + +// NewKeyValueStore 创建一个新的键值存储实例 +func NewKeyValueStore() *KeyValueStore { + kv := &KeyValueStore{ + data: make(map[string][]string), + } + kv.cond = sync.NewCond(&kv.mu) + return kv +} + +// Put 将键值对放入存储 +func (kv *KeyValueStore) Put(key, value string) { + kv.mu.Lock() + defer kv.mu.Unlock() + + // 如果键已存在,则追加值;否则,创建新的值切片 + if existingValues, ok := kv.data[key]; ok { + kv.data[key] = append(existingValues, value) + } else { + kv.data[key] = []string{value} + } + kv.cond.Broadcast() +} + +// Get 根据键获取第一个值并删除该键值对中的第一个值 +func (kv *KeyValueStore) Get(key string) (string, bool) { + kv.mu.Lock() + defer kv.mu.Unlock() + // 如果值列表为空,等待通知或超时 + timeout := time.Now().Add(5 * time.Second) + for len(kv.data[key]) == 0 { + select { + case <-time.After(timeout.Sub(time.Now())): + return "", false // 超时后返回空值 + default: + waitChan := make(chan struct{}) + kv.cond.L.Unlock() + go func() { + kv.cond.L.Lock() + close(waitChan) + }() + <-waitChan + } + } + + // 获取值列表 + values, ok := kv.data[key] + if ok && len(values) > 0 { + // 获取第一个值并从切片中删除 + firstValue := values[0] + if len(values) == 1 { + delete(kv.data, key) // 如果只有一个值,删除整个键值对 + } else { + kv.data[key] = values[1:] // 如果有多个值,删除第一个值 + } + return firstValue, true + } + + return "", false +} + +// Size 返回特定键的值列表大小 +func (kv *KeyValueStore) Size(topic string) int64 { + kv.mu.Lock() + defer kv.mu.Unlock() + + return int64(len(kv.data[topic])) +} diff --git a/mq/mqcomponent/grpcserver/server.go b/mq/mqcomponent/grpcserver/server.go new file mode 100644 index 0000000000..2510d7cc50 --- /dev/null +++ b/mq/mqcomponent/grpcserver/server.go @@ -0,0 +1,54 @@ +package grpcserver + +import ( + "context" + "fmt" + "github.com/goodrain/rainbond/config/configs" + grpcserver "github.com/goodrain/rainbond/mq/api/grpc/server" + "github.com/goodrain/rainbond/mq/mqcomponent/mqclient" + "github.com/goodrain/rainbond/pkg/gogo" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "net" +) + +// NewGrpcServer - +func NewGrpcServer() *Component { + return &Component{} +} + +// Component - +type Component struct { + server *grpc.Server + lis net.Listener +} + +// StartCancel - +func (c *Component) StartCancel(ctx context.Context, cancel context.CancelFunc, cfg *configs.Config) error { + s := grpc.NewServer() + c.server = s + grpcserver.RegisterServer(s, mqclient.Default().ActionMQ()) + return gogo.Go(func(ctx context.Context) (err error) { + defer cancel() + c.lis, err = net.Listen("tcp", fmt.Sprintf(":%d", cfg.MQConfig.APIPort)) + logrus.Infof("grpc server listen on %d", cfg.MQConfig.APIPort) + if err := s.Serve(c.lis); err != nil { + logrus.Error("mq api grpc listen error.", err.Error()) + return err + } + return err + }) +} + +// Start - +func (c *Component) Start(ctx context.Context, cfg *configs.Config) (err error) { + panic("implement me") +} + +// CloseHandle - +func (c *Component) CloseHandle() { + err := c.lis.Close() + if err != nil { + logrus.Errorf("failed to close listener: %v", err) + } +} diff --git a/mq/mqcomponent/metrics/server.go b/mq/mqcomponent/metrics/server.go new file mode 100644 index 0000000000..065b1d92bf --- /dev/null +++ b/mq/mqcomponent/metrics/server.go @@ -0,0 +1,53 @@ +package metrics + +import ( + "context" + "github.com/goodrain/rainbond/mq/monitor" + "github.com/goodrain/rainbond/mq/mqcomponent/mqclient" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/goodrain/rainbond/config/configs" + "github.com/goodrain/rainbond/pkg/gogo" + httputil "github.com/goodrain/rainbond/util/http" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/version" + "github.com/sirupsen/logrus" + "net/http" +) + +// Server 要做一些监控或者指标收集 +type Server struct { +} + +// Start - +func (s *Server) Start(ctx context.Context, cfg *configs.Config) error { + return s.StartCancel(ctx, nil, cfg) +} + +// CloseHandle - +func (s *Server) CloseHandle() { +} + +// NewMetricsServer - +func NewMetricsServer() *Server { + return &Server{} +} + +// StartCancel - +func (s *Server) StartCancel(ctx context.Context, cancel context.CancelFunc, cfg *configs.Config) error { + prometheus.MustRegister(version.NewCollector("acp_mq")) + prometheus.MustRegister(monitor.NewExporter(mqclient.Default().ActionMQ())) + http.Handle("/metrics", promhttp.Handler()) + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + httputil.ReturnSuccess(r, w, map[string]string{"status": "health", "info": "mq service health"}) + }) + return gogo.Go(func(ctx context.Context) error { + logrus.Infof("start metrics server") + defer cancel() + if err := http.ListenAndServe(":6301", nil); err != nil { + logrus.Error("mq pprof listen error.", err.Error()) + return err + } + return nil + }) +} diff --git a/mq/mqcomponent/mqclient/client.go b/mq/mqcomponent/mqclient/client.go new file mode 100644 index 0000000000..213728a813 --- /dev/null +++ b/mq/mqcomponent/mqclient/client.go @@ -0,0 +1,33 @@ +package mqclient + +import ( + "context" + "github.com/goodrain/rainbond/config/configs" + "github.com/goodrain/rainbond/mq/api/mq" +) + +var defaultMQClientComponent *Component + +type Component struct { + actionMQ mq.ActionMQ +} + +func Default() *Component { + return defaultMQClientComponent +} +func (m *Component) ActionMQ() mq.ActionMQ { + return m.actionMQ +} + +func (m *Component) Start(ctx context.Context, cfg *configs.Config) error { + m.actionMQ = mq.NewActionMQ(ctx, cfg.MQConfig) + return m.actionMQ.Start() +} + +func (m *Component) CloseHandle() { +} + +func MQClient() *Component { + defaultMQClientComponent = &Component{} + return defaultMQClientComponent +} diff --git a/node/api/router/router.go b/node/api/router/router.go index f5e34895ce..6ccb0e9a8d 100644 --- a/node/api/router/router.go +++ b/node/api/router/router.go @@ -20,8 +20,6 @@ package router import ( "github.com/goodrain/rainbond/pkg/interceptors" - "time" - "github.com/sirupsen/logrus" "github.com/goodrain/rainbond/node/api/controller" @@ -44,7 +42,7 @@ func Routers(mode string) *chi.Mux { //Gracefully absorb panics and prints the stack trace r.Use(interceptors.Recoverer) //request time out - r.Use(middleware.Timeout(time.Second * 5)) + //r.Use(middleware.Timeout(time.Second * 5)) r.Mount("/v1", DisconverRoutes()) r.Route("/v2", func(r chi.Router) { r.Get("/ping", controller.Ping) diff --git a/pkg/component/core.go b/pkg/component/core.go index e30dece6ad..b1d08f1d54 100644 --- a/pkg/component/core.go +++ b/pkg/component/core.go @@ -26,6 +26,9 @@ import ( "github.com/goodrain/rainbond/api/server" "github.com/goodrain/rainbond/config/configs" "github.com/goodrain/rainbond/event" + "github.com/goodrain/rainbond/mq/mqcomponent/grpcserver" + "github.com/goodrain/rainbond/mq/mqcomponent/metrics" + "github.com/goodrain/rainbond/mq/mqcomponent/mqclient" "github.com/goodrain/rainbond/pkg/component/etcd" "github.com/goodrain/rainbond/pkg/component/grpc" "github.com/goodrain/rainbond/pkg/component/hubregistry" @@ -128,3 +131,15 @@ func Proxy() rainbond.FuncComponent { return nil } } + +func MQHealthServer() rainbond.ComponentCancel { + return metrics.NewMetricsServer() +} + +func MQGrpcServer() rainbond.ComponentCancel { + return grpcserver.NewGrpcServer() +} + +func MQClient() rainbond.Component { + return mqclient.MQClient() +} diff --git a/util/etcd/queue.go b/util/etcd/queue.go deleted file mode 100644 index 84241760a8..0000000000 --- a/util/etcd/queue.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package etcd - -import ( - "fmt" - - v3 "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "golang.org/x/net/context" -) - -// Queue implements a multi-reader, multi-writer distributed queue. -type Queue struct { - client *v3.Client - ctx context.Context - - keyPrefix string -} - -// NewQueue new queue -func NewQueue(ctx context.Context, client *v3.Client, keyPrefix string) *Queue { - return &Queue{client, ctx, keyPrefix} -} - -// Enqueue en queue -func (q *Queue) Enqueue(val string) error { - _, err := newUniqueKV(q.ctx, q.client, q.keyPrefix, val) - return err -} - -// Dequeue returns Enqueue()'d elements in FIFO order. If the -// queue is empty, Dequeue blocks until elements are available. -func (q *Queue) Dequeue() (string, error) { - for { - // TODO: fewer round trips by fetching more than one key - resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...) - if err != nil { - return "", err - } - - kv, err := claimFirstKey(q.ctx, q.client, resp.Kvs) - if err != nil { - return "", err - } else if kv != nil { - return string(kv.Value), nil - } else if resp.More { - // missed some items, retry to read in more - return q.Dequeue() - } - - // nothing yet; wait on elements - ev, err := WaitPrefixEvents( - q.client, - q.keyPrefix, - resp.Header.Revision, - []mvccpb.Event_EventType{mvccpb.PUT}) - if err != nil { - if err == ErrNoUpdateForLongTime { - continue - } - return "", err - } - if ev == nil { - return "", fmt.Errorf("event is nil") - } - if ev.Kv == nil { - return "", fmt.Errorf("event key value is nil") - } - ok, err := deleteRevKey(q.ctx, q.client, string(ev.Kv.Key), ev.Kv.ModRevision) - if err != nil { - return "", err - } else if !ok { - return q.Dequeue() - } - return string(ev.Kv.Value), err - } -} diff --git a/util/etcd/watch.go b/util/etcd/watch.go deleted file mode 100644 index 9cf108c13a..0000000000 --- a/util/etcd/watch.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package etcd - -import ( - "fmt" - "time" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/sirupsen/logrus" - "golang.org/x/net/context" -) - -//ErrNoUpdateForLongTime no update for long time , can reobservation of synchronous data -var ErrNoUpdateForLongTime = fmt.Errorf("not updated for a long time") - -//WaitPrefixEvents WaitPrefixEvents -func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []mvccpb.Event_EventType) (*clientv3.Event, error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logrus.Debug("start watch message from etcd queue") - wc := clientv3.NewWatcher(c).Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) - if wc == nil { - return nil, ErrNoWatcher - } - event := waitEvents(wc, evs) - if event != nil { - return event, nil - } - logrus.Debug("queue watcher sync, because of not updated for a long time") - return nil, ErrNoUpdateForLongTime -} - -//waitEvents this will return nil -func waitEvents(wc clientv3.WatchChan, evs []mvccpb.Event_EventType) *clientv3.Event { - i := 0 - timer := time.NewTimer(time.Second * 30) - defer timer.Stop() - for { - select { - case wresp := <-wc: - if wresp.Err() != nil { - logrus.Errorf("watch event failure %s", wresp.Err().Error()) - return nil - } - if len(wresp.Events) == 0 { - return nil - } - for _, ev := range wresp.Events { - if ev.Type == evs[i] { - i++ - if i == len(evs) { - return ev - } - } - } - case <-timer.C: - return nil - } - } -}