diff --git a/api/api/api_interface.go b/api/api/api_interface.go
index 5fcc8ac437..f4b903b3fb 100644
--- a/api/api/api_interface.go
+++ b/api/api/api_interface.go
@@ -281,6 +281,7 @@ type AppRestoreInterface interface {
// PodInterface defines api methods about k8s pods.
type PodInterface interface {
PodDetail(w http.ResponseWriter, r *http.Request)
+ PodLogs(w http.ResponseWriter, r *http.Request)
}
// RegistryAuthSecretInterface registry auth secret interface
diff --git a/api/api_routers/version2/v2Routers.go b/api/api_routers/version2/v2Routers.go
index 968f77131c..659d1195d9 100644
--- a/api/api_routers/version2/v2Routers.go
+++ b/api/api_routers/version2/v2Routers.go
@@ -64,6 +64,7 @@ func (v2 *V2) Routes() chi.Router {
r.Mount("/monitor", v2.monitorRouter())
r.Mount("/helm", v2.helmRouter())
r.Mount("/proxy-pass", v2.proxyRoute())
+ r.Get("/pods/logs", controller.GetManager().PodLogs)
return r
}
diff --git a/api/controller/pods.go b/api/controller/pods.go
index 498aa4b42e..4cf1332910 100644
--- a/api/controller/pods.go
+++ b/api/controller/pods.go
@@ -19,18 +19,21 @@
package controller
import (
+ "bufio"
"fmt"
- "net/http"
- "strings"
-
"github.com/go-chi/chi"
"github.com/goodrain/rainbond/api/handler"
ctxutil "github.com/goodrain/rainbond/api/util/ctx"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
+ "github.com/goodrain/rainbond/pkg/component/k8s"
httputil "github.com/goodrain/rainbond/util/http"
"github.com/goodrain/rainbond/worker/server"
"github.com/sirupsen/logrus"
+ corev1 "k8s.io/api/core/v1"
+ "net/http"
+ "strings"
+ "time"
)
// PodController is an implementation of PodInterface
@@ -114,3 +117,57 @@ func (p *PodController) PodDetail(w http.ResponseWriter, r *http.Request) {
}
httputil.ReturnSuccess(r, w, pd)
}
+
+// PodLogs -
+func (p *PodController) PodLogs(w http.ResponseWriter, r *http.Request) {
+ podNamespace := r.URL.Query().Get("pod_ns")
+ podName := r.URL.Query().Get("pod_name")
+ // Get Kubernetes pod logs
+ lines := int64(1280)
+ req := k8s.Default().Clientset.CoreV1().Pods(podNamespace).GetLogs(podName, &corev1.PodLogOptions{
+ Follow: true,
+ Timestamps: true,
+ TailLines: &lines,
+ })
+ stream, err := req.Stream(r.Context())
+ if err != nil {
+ logrus.Errorf("Error opening log stream: %v", err)
+ http.Error(w, "Error opening log stream", http.StatusInternalServerError)
+ return
+ }
+ defer stream.Close()
+ // Use Flusher to send headers to the client
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ logrus.Errorf("Streaming not supported")
+ http.Error(w, "Streaming not supported", http.StatusInternalServerError)
+ return
+ }
+
+ // Set headers for SSE
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+
+ scanner := bufio.NewScanner(stream)
+ ticker := time.NewTicker(50 * time.Millisecond)
+ defer ticker.Stop()
+
+ var messages []string
+
+ for {
+ select {
+ case <-ticker.C:
+ fmt.Fprintf(w, strings.Join(messages, "\n\n"))
+ messages = messages[:0] // Clear the slice
+ flusher.Flush()
+ case <-r.Context().Done():
+ logrus.Warningf("Request context done: %v", r.Context().Err())
+ return
+ default:
+ if len(messages) < 128 && scanner.Scan() {
+ messages = append(messages, "data: "+scanner.Text())
+ }
+ }
+ }
+}
diff --git a/api/server/api.go b/api/server/api.go
index 4825a4e507..0749bc911a 100644
--- a/api/server/api.go
+++ b/api/server/api.go
@@ -23,18 +23,16 @@ import (
"crypto/tls"
"crypto/x509"
"github.com/coreos/etcd/clientv3"
+ "github.com/goodrain/rainbond/api/handler"
"github.com/goodrain/rainbond/pkg/interceptors"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/prometheus/common/version"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
- "time"
-
- "github.com/goodrain/rainbond/api/handler"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/prometheus/common/version"
"github.com/goodrain/rainbond/util"
@@ -108,7 +106,7 @@ func (m *Manager) SetMiddleware() {
//Gracefully absorb panics and prints the stack trace
r.Use(interceptors.Recoverer)
//request time out
- r.Use(middleware.Timeout(time.Second * 5))
+ //r.Use(middleware.Timeout(time.Second * 5))
//simple authz
if os.Getenv("TOKEN") != "" {
r.Use(apimiddleware.FullToken)
diff --git a/cmd/api/main.go b/cmd/api/main.go
index ec976a9713..396c7184d8 100644
--- a/cmd/api/main.go
+++ b/cmd/api/main.go
@@ -47,7 +47,8 @@ func main() {
APIConfig: s.Config,
})
// 启动 rbd-api
- err := rainbond.New(context.Background(), configs.Default()).Registry(component.Database()).
+ err := rainbond.New(context.Background(), configs.Default()).
+ Registry(component.Database()).
Registry(component.Grpc()).
Registry(component.Event()).
Registry(component.K8sClient()).
diff --git a/cmd/mq/mq.go b/cmd/mq/mq.go
index 2f7d55b319..b099c26b93 100644
--- a/cmd/mq/mq.go
+++ b/cmd/mq/mq.go
@@ -19,13 +19,15 @@
package main
import (
- "fmt"
+ "context"
+ "github.com/goodrain/rainbond/config/configs"
+ "github.com/goodrain/rainbond/pkg/component"
+ "github.com/goodrain/rainbond/pkg/rainbond"
+ "github.com/sirupsen/logrus"
"os"
"github.com/goodrain/rainbond/cmd"
"github.com/goodrain/rainbond/cmd/mq/option"
- "github.com/goodrain/rainbond/cmd/mq/server"
-
"github.com/spf13/pflag"
)
@@ -37,8 +39,17 @@ func main() {
s.AddFlags(pflag.CommandLine)
pflag.Parse()
s.SetLog()
- if err := server.Run(s); err != nil {
- fmt.Fprintf(os.Stderr, "error: %v\n", err)
- os.Exit(1)
+
+ configs.SetDefault(&configs.Config{
+ AppName: "rbd-mq",
+ MQConfig: s.Config,
+ })
+ err := rainbond.New(context.Background(), configs.Default()).
+ Registry(component.MQClient()).
+ RegistryCancel(component.MQGrpcServer()).
+ RegistryCancel(component.MQHealthServer()).
+ Start()
+ if err != nil {
+ logrus.Errorf("start rbd mq error %s", err.Error())
}
}
diff --git a/cmd/mq/server/server.go b/cmd/mq/server/server.go
deleted file mode 100644
index 04d9abf07d..0000000000
--- a/cmd/mq/server/server.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright (C) 2014-2018 Goodrain Co., Ltd.
-// RAINBOND, Application Management Platform
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version. For any non-GPL usage of Rainbond,
-// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
-// must be obtained first.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-package server
-
-import (
- "os"
- "os/signal"
- "syscall"
-
- "github.com/goodrain/rainbond/cmd/mq/option"
- discover "github.com/goodrain/rainbond/discover.v2"
- "github.com/goodrain/rainbond/mq/api"
-
- etcdutil "github.com/goodrain/rainbond/util/etcd"
- "github.com/sirupsen/logrus"
-)
-
-//Run start run
-func Run(s *option.MQServer) error {
- errChan := make(chan error)
-
- //step 1:start mq api manager
- apiManager, err := api.NewManager(s.Config)
- if err != nil {
- return err
- }
- apiManager.Start(errChan)
- defer apiManager.Stop()
-
- etcdClientArgs := &etcdutil.ClientArgs{
- Endpoints: s.Config.EtcdEndPoints,
- CaFile: s.Config.EtcdCaFile,
- CertFile: s.Config.EtcdCertFile,
- KeyFile: s.Config.EtcdKeyFile,
- }
-
- //step 2:regist mq endpoint
- keepalive, err := discover.CreateKeepAlive(etcdClientArgs, "rainbond_mq", s.Config.HostName, s.Config.HostIP, s.Config.APIPort)
- if err != nil {
- return err
- }
- if err := keepalive.Start(); err != nil {
- return err
- }
- defer keepalive.Stop()
-
- //step 3:regist prometheus export endpoint
- exportKeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "mq", s.Config.HostName, s.Config.HostIP, 6301)
- if err != nil {
- return err
- }
- if err := exportKeepalive.Start(); err != nil {
- return err
- }
- defer exportKeepalive.Stop()
-
- //step finally: listen Signal
- term := make(chan os.Signal)
- signal.Notify(term, os.Interrupt, syscall.SIGTERM)
- select {
- case <-term:
- logrus.Warn("Received SIGTERM, exiting gracefully...")
- case err := <-errChan:
- logrus.Errorf("Received a error %s, exiting gracefully...", err.Error())
- }
- logrus.Info("See you next time!")
- return nil
-}
diff --git a/config/configs/config.go b/config/configs/config.go
index 03272fd7e8..59379a855e 100644
--- a/config/configs/config.go
+++ b/config/configs/config.go
@@ -1,6 +1,9 @@
package configs
-import "github.com/goodrain/rainbond/cmd/api/option"
+import (
+ apiconfig "github.com/goodrain/rainbond/cmd/api/option"
+ mqconfig "github.com/goodrain/rainbond/cmd/mq/option"
+)
// Env -
type Env string
@@ -11,7 +14,8 @@ type Config struct {
Version string
Env Env
Debug bool
- APIConfig option.Config
+ APIConfig apiconfig.Config
+ MQConfig mqconfig.Config
}
var defaultConfig *Config
diff --git a/mq/api/api.go b/mq/api/api.go
deleted file mode 100644
index 32006a6864..0000000000
--- a/mq/api/api.go
+++ /dev/null
@@ -1,198 +0,0 @@
-// Copyright (C) 2014-2018 Goodrain Co., Ltd.
-// RAINBOND, Application Management Platform
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version. For any non-GPL usage of Rainbond,
-// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
-// must be obtained first.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-package api
-
-import (
- "fmt"
- "net"
- "net/http"
- "time"
-
- "github.com/goodrain/rainbond/cmd/mq/option"
- "github.com/goodrain/rainbond/mq/api/controller"
- "github.com/goodrain/rainbond/mq/api/mq"
- "github.com/goodrain/rainbond/mq/monitor"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/reflection"
-
- "golang.org/x/net/context"
-
- _ "net/http/pprof"
-
- restful "github.com/emicklei/go-restful"
- swagger "github.com/emicklei/go-restful-swagger12"
- grpcserver "github.com/goodrain/rainbond/mq/api/grpc/server"
- httputil "github.com/goodrain/rainbond/util/http"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/prometheus/common/version"
- "github.com/sirupsen/logrus"
-)
-
-type Manager struct {
- container *restful.Container
- ctx context.Context
- cancel context.CancelFunc
- conf option.Config
- server Server
- actionMQ mq.ActionMQ
-}
-type Server interface {
- Server() error
- Close() error
-}
-
-type httpServer struct {
- server *http.Server
-}
-
-func (h *httpServer) Server() error {
- if err := h.server.ListenAndServe(); err != nil {
- logrus.Error("mq api http listen error.", err.Error())
- return err
- }
- return nil
-}
-func (h *httpServer) Close() error {
- if h.server != nil {
- ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
- return h.server.Shutdown(ctx)
- }
- return nil
-}
-
-type grpcServer struct {
- server *grpc.Server
- lis net.Listener
-}
-
-func (h *grpcServer) Server() error {
- if err := h.server.Serve(h.lis); err != nil {
- logrus.Error("mq api grpc listen error.", err.Error())
- return err
- }
- return nil
-}
-func (h *grpcServer) Close() error {
- return h.lis.Close()
-}
-
-//NewManager api manager
-func NewManager(c option.Config) (*Manager, error) {
- ctx, cancel := context.WithCancel(context.Background())
- actionMQ := mq.NewActionMQ(ctx, c)
- manager := &Manager{
- ctx: ctx,
- cancel: cancel,
- conf: c,
- actionMQ: actionMQ,
- }
- go func() {
- manager.Prometheus()
- health()
- if err := http.ListenAndServe(":6301", nil); err != nil {
- logrus.Error("mq pprof listen error.", err.Error())
- }
- }()
- if c.RunMode == "http" {
- wsContainer := restful.NewContainer()
- server := &http.Server{Addr: fmt.Sprintf(":%d", c.APIPort), Handler: wsContainer}
- controller.Register(wsContainer, actionMQ)
- manager.container = wsContainer
- manager.server = &httpServer{server}
- manager.doc()
- logrus.Info("mq server api run with http")
- } else {
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", c.APIPort))
- if err != nil {
- logrus.Errorf("failed to listen: %v", err)
- return nil, err
- }
- s := grpc.NewServer()
- grpcserver.RegisterServer(s, actionMQ)
- // Register reflection service on gRPC server.
- reflection.Register(s)
- manager.server = &grpcServer{
- server: s,
- lis: lis,
- }
- logrus.Info("mq server api run with gRPC")
- }
-
- return manager, nil
-}
-
-//Start 启动
-func (m *Manager) Start(errChan chan error) {
- logrus.Infof("api server start listening on 0.0.0.0:%d", m.conf.APIPort)
- err := m.actionMQ.Start()
- if err != nil {
- errChan <- err
- }
- go func() {
- if err := m.server.Server(); err != nil {
- logrus.Error("mq api listen error.", err.Error())
- errChan <- err
- }
- }()
-}
-
-func (m *Manager) doc() {
- // Optionally, you can install the Swagger Service which provides a nice Web UI on your REST API
- // You need to download the Swagger HTML5 assets and change the FilePath location in the config below.
- // Open http://localhost:8080/apidocs and enter http://localhost:8080/swagger.json in the api input field.
- config := swagger.Config{
- WebServices: m.container.RegisteredWebServices(), // you control what services are visible
- ApiPath: "/swagger.json",
-
- // Optionally, specify where the UI is located
- SwaggerPath: "/apidocs/",
- Info: swagger.Info{
- Title: "goodrain mq api doc.",
- },
- ApiVersion: "1.0",
- SwaggerFilePath: "./dist"}
- swagger.RegisterSwaggerService(config, m.container)
-
-}
-
-//Stop 停止
-func (m *Manager) Stop() error {
- logrus.Info("api server is stoping.")
- m.cancel()
- //m.server.Close()
- return m.actionMQ.Stop()
-}
-
-//Prometheus prometheus init
-func (m *Manager) Prometheus() {
- prometheus.MustRegister(version.NewCollector("acp_mq"))
- exporter := monitor.NewExporter(m.actionMQ)
- prometheus.MustRegister(exporter)
- http.Handle("/metrics", promhttp.Handler())
-}
-
-func health() {
- http.HandleFunc("/health", checkHalth)
-}
-
-func checkHalth(w http.ResponseWriter, r *http.Request) {
- httputil.ReturnSuccess(r, w, map[string]string{"status": "health", "info": "mq service health"})
-}
diff --git a/mq/api/mq/mq.go b/mq/api/mq/mq.go
index 11e6f8b815..3716aca5b5 100644
--- a/mq/api/mq/mq.go
+++ b/mq/api/mq/mq.go
@@ -19,19 +19,14 @@
package mq
import (
- "github.com/coreos/etcd/clientv3"
+ "github.com/goodrain/rainbond/cmd/mq/option"
+ "github.com/goodrain/rainbond/mq/client"
"os"
"strings"
"sync"
- "time"
-
- "github.com/goodrain/rainbond/cmd/mq/option"
- "github.com/goodrain/rainbond/mq/client"
"golang.org/x/net/context"
- etcdutil "github.com/goodrain/rainbond/util/etcd"
-
"github.com/sirupsen/logrus"
)
@@ -63,28 +58,17 @@ func NewActionMQ(ctx context.Context, c option.Config) ActionMQ {
}
type etcdQueue struct {
+ client *KeyValueStore
config option.Config
ctx context.Context
queues map[string]string
queuesLock sync.Mutex
- client *clientv3.Client
}
func (e *etcdQueue) Start() error {
logrus.Debug("etcd message queue client starting")
- etcdClientArgs := &etcdutil.ClientArgs{
- Endpoints: e.config.EtcdEndPoints,
- CaFile: e.config.EtcdCaFile,
- CertFile: e.config.EtcdCertFile,
- KeyFile: e.config.EtcdKeyFile,
- DialTimeout: time.Duration(e.config.EtcdTimeout) * time.Second,
- }
- cli, err := etcdutil.NewClient(context.Background(), etcdClientArgs)
- if err != nil {
- etcdutil.HandleEtcdError(err)
- return err
- }
- e.client = cli
+
+ e.client = NewKeyValueStore()
topics := os.Getenv("topics")
if topics != "" {
ts := strings.Split(topics, ",")
@@ -121,9 +105,7 @@ func (e *etcdQueue) GetAllTopics() []string {
}
func (e *etcdQueue) Stop() error {
- if e.client != nil {
- e.client.Close()
- }
+
return nil
}
func (e *etcdQueue) queueKey(topic string) string {
@@ -131,25 +113,16 @@ func (e *etcdQueue) queueKey(topic string) string {
}
func (e *etcdQueue) Enqueue(ctx context.Context, topic, value string) error {
EnqueueNumber++
- queue := etcdutil.NewQueue(ctx, e.client, e.queueKey(topic))
- return queue.Enqueue(value)
+ e.client.Put(e.queueKey(topic), value)
+ return nil
}
func (e *etcdQueue) Dequeue(ctx context.Context, topic string) (string, error) {
DequeueNumber++
- queue := etcdutil.NewQueue(ctx, e.client, e.queueKey(topic))
- return queue.Dequeue()
+ res, _ := e.client.Get(e.queueKey(topic))
+ return res, nil
}
func (e *etcdQueue) MessageQueueSize(topic string) int64 {
- ctx, cancel := context.WithCancel(e.ctx)
- defer cancel()
- res, err := e.client.Get(ctx, e.queueKey(topic), clientv3.WithPrefix())
- if err != nil {
- logrus.Errorf("get message queue size failure %s", err.Error())
- }
- if res != nil {
- return res.Count
- }
- return 0
+ return e.client.Size(topic)
}
diff --git a/mq/api/mq/store.go b/mq/api/mq/store.go
new file mode 100644
index 0000000000..46f5ce36f5
--- /dev/null
+++ b/mq/api/mq/store.go
@@ -0,0 +1,81 @@
+package mq
+
+import (
+ "sync"
+ "time"
+)
+
+// KeyValueStore 是一个简单的键值存储结构,支持多值
+type KeyValueStore struct {
+ data map[string][]string
+ mu sync.Mutex
+ cond *sync.Cond
+}
+
+// NewKeyValueStore 创建一个新的键值存储实例
+func NewKeyValueStore() *KeyValueStore {
+ kv := &KeyValueStore{
+ data: make(map[string][]string),
+ }
+ kv.cond = sync.NewCond(&kv.mu)
+ return kv
+}
+
+// Put 将键值对放入存储
+func (kv *KeyValueStore) Put(key, value string) {
+ kv.mu.Lock()
+ defer kv.mu.Unlock()
+
+ // 如果键已存在,则追加值;否则,创建新的值切片
+ if existingValues, ok := kv.data[key]; ok {
+ kv.data[key] = append(existingValues, value)
+ } else {
+ kv.data[key] = []string{value}
+ }
+ kv.cond.Broadcast()
+}
+
+// Get 根据键获取第一个值并删除该键值对中的第一个值
+func (kv *KeyValueStore) Get(key string) (string, bool) {
+ kv.mu.Lock()
+ defer kv.mu.Unlock()
+ // 如果值列表为空,等待通知或超时
+ timeout := time.Now().Add(5 * time.Second)
+ for len(kv.data[key]) == 0 {
+ select {
+ case <-time.After(timeout.Sub(time.Now())):
+ return "", false // 超时后返回空值
+ default:
+ waitChan := make(chan struct{})
+ kv.cond.L.Unlock()
+ go func() {
+ kv.cond.L.Lock()
+ close(waitChan)
+ }()
+ <-waitChan
+ }
+ }
+
+ // 获取值列表
+ values, ok := kv.data[key]
+ if ok && len(values) > 0 {
+ // 获取第一个值并从切片中删除
+ firstValue := values[0]
+ if len(values) == 1 {
+ delete(kv.data, key) // 如果只有一个值,删除整个键值对
+ } else {
+ kv.data[key] = values[1:] // 如果有多个值,删除第一个值
+ }
+ return firstValue, true
+ }
+
+ return "", false
+}
+
+// Size 返回特定键的值列表大小
+func (kv *KeyValueStore) Size(topic string) int64 {
+ kv.mu.Lock()
+ defer kv.mu.Unlock()
+
+ return int64(len(kv.data[topic]))
+}
diff --git a/mq/mqcomponent/grpcserver/server.go b/mq/mqcomponent/grpcserver/server.go
new file mode 100644
index 0000000000..2510d7cc50
--- /dev/null
+++ b/mq/mqcomponent/grpcserver/server.go
@@ -0,0 +1,54 @@
+package grpcserver
+
+import (
+ "context"
+ "fmt"
+ "github.com/goodrain/rainbond/config/configs"
+ grpcserver "github.com/goodrain/rainbond/mq/api/grpc/server"
+ "github.com/goodrain/rainbond/mq/mqcomponent/mqclient"
+ "github.com/goodrain/rainbond/pkg/gogo"
+ "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "net"
+)
+
+// NewGrpcServer -
+func NewGrpcServer() *Component {
+ return &Component{}
+}
+
+// Component -
+type Component struct {
+ server *grpc.Server
+ lis net.Listener
+}
+
+// StartCancel -
+func (c *Component) StartCancel(ctx context.Context, cancel context.CancelFunc, cfg *configs.Config) error {
+ s := grpc.NewServer()
+ c.server = s
+ grpcserver.RegisterServer(s, mqclient.Default().ActionMQ())
+ return gogo.Go(func(ctx context.Context) (err error) {
+ defer cancel()
+ c.lis, err = net.Listen("tcp", fmt.Sprintf(":%d", cfg.MQConfig.APIPort))
+ logrus.Infof("grpc server listen on %d", cfg.MQConfig.APIPort)
+ if err := s.Serve(c.lis); err != nil {
+ logrus.Error("mq api grpc listen error.", err.Error())
+ return err
+ }
+ return err
+ })
+}
+
+// Start -
+func (c *Component) Start(ctx context.Context, cfg *configs.Config) (err error) {
+ panic("implement me")
+}
+
+// CloseHandle -
+func (c *Component) CloseHandle() {
+ err := c.lis.Close()
+ if err != nil {
+ logrus.Errorf("failed to close listener: %v", err)
+ }
+}
diff --git a/mq/mqcomponent/metrics/server.go b/mq/mqcomponent/metrics/server.go
new file mode 100644
index 0000000000..065b1d92bf
--- /dev/null
+++ b/mq/mqcomponent/metrics/server.go
@@ -0,0 +1,53 @@
+package metrics
+
+import (
+ "context"
+ "github.com/goodrain/rainbond/mq/monitor"
+ "github.com/goodrain/rainbond/mq/mqcomponent/mqclient"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+
+ "github.com/goodrain/rainbond/config/configs"
+ "github.com/goodrain/rainbond/pkg/gogo"
+ httputil "github.com/goodrain/rainbond/util/http"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/common/version"
+ "github.com/sirupsen/logrus"
+ "net/http"
+)
+
+// Server 要做一些监控或者指标收集
+type Server struct {
+}
+
+// Start -
+func (s *Server) Start(ctx context.Context, cfg *configs.Config) error {
+ return s.StartCancel(ctx, nil, cfg)
+}
+
+// CloseHandle -
+func (s *Server) CloseHandle() {
+}
+
+// NewMetricsServer -
+func NewMetricsServer() *Server {
+ return &Server{}
+}
+
+// StartCancel -
+func (s *Server) StartCancel(ctx context.Context, cancel context.CancelFunc, cfg *configs.Config) error {
+ prometheus.MustRegister(version.NewCollector("acp_mq"))
+ prometheus.MustRegister(monitor.NewExporter(mqclient.Default().ActionMQ()))
+ http.Handle("/metrics", promhttp.Handler())
+ http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
+ httputil.ReturnSuccess(r, w, map[string]string{"status": "health", "info": "mq service health"})
+ })
+ return gogo.Go(func(ctx context.Context) error {
+ logrus.Infof("start metrics server")
+ defer cancel()
+ if err := http.ListenAndServe(":6301", nil); err != nil {
+ logrus.Error("mq pprof listen error.", err.Error())
+ return err
+ }
+ return nil
+ })
+}
diff --git a/mq/mqcomponent/mqclient/client.go b/mq/mqcomponent/mqclient/client.go
new file mode 100644
index 0000000000..213728a813
--- /dev/null
+++ b/mq/mqcomponent/mqclient/client.go
@@ -0,0 +1,33 @@
+package mqclient
+
+import (
+ "context"
+ "github.com/goodrain/rainbond/config/configs"
+ "github.com/goodrain/rainbond/mq/api/mq"
+)
+
+var defaultMQClientComponent *Component
+
+type Component struct {
+ actionMQ mq.ActionMQ
+}
+
+func Default() *Component {
+ return defaultMQClientComponent
+}
+func (m *Component) ActionMQ() mq.ActionMQ {
+ return m.actionMQ
+}
+
+func (m *Component) Start(ctx context.Context, cfg *configs.Config) error {
+ m.actionMQ = mq.NewActionMQ(ctx, cfg.MQConfig)
+ return m.actionMQ.Start()
+}
+
+func (m *Component) CloseHandle() {
+}
+
+func MQClient() *Component {
+ defaultMQClientComponent = &Component{}
+ return defaultMQClientComponent
+}
diff --git a/node/api/router/router.go b/node/api/router/router.go
index f5e34895ce..6ccb0e9a8d 100644
--- a/node/api/router/router.go
+++ b/node/api/router/router.go
@@ -20,8 +20,6 @@ package router
import (
"github.com/goodrain/rainbond/pkg/interceptors"
- "time"
-
"github.com/sirupsen/logrus"
"github.com/goodrain/rainbond/node/api/controller"
@@ -44,7 +42,7 @@ func Routers(mode string) *chi.Mux {
//Gracefully absorb panics and prints the stack trace
r.Use(interceptors.Recoverer)
//request time out
- r.Use(middleware.Timeout(time.Second * 5))
+ //r.Use(middleware.Timeout(time.Second * 5))
r.Mount("/v1", DisconverRoutes())
r.Route("/v2", func(r chi.Router) {
r.Get("/ping", controller.Ping)
diff --git a/pkg/component/core.go b/pkg/component/core.go
index e30dece6ad..b1d08f1d54 100644
--- a/pkg/component/core.go
+++ b/pkg/component/core.go
@@ -26,6 +26,9 @@ import (
"github.com/goodrain/rainbond/api/server"
"github.com/goodrain/rainbond/config/configs"
"github.com/goodrain/rainbond/event"
+ "github.com/goodrain/rainbond/mq/mqcomponent/grpcserver"
+ "github.com/goodrain/rainbond/mq/mqcomponent/metrics"
+ "github.com/goodrain/rainbond/mq/mqcomponent/mqclient"
"github.com/goodrain/rainbond/pkg/component/etcd"
"github.com/goodrain/rainbond/pkg/component/grpc"
"github.com/goodrain/rainbond/pkg/component/hubregistry"
@@ -128,3 +131,15 @@ func Proxy() rainbond.FuncComponent {
return nil
}
}
+
+func MQHealthServer() rainbond.ComponentCancel {
+ return metrics.NewMetricsServer()
+}
+
+func MQGrpcServer() rainbond.ComponentCancel {
+ return grpcserver.NewGrpcServer()
+}
+
+func MQClient() rainbond.Component {
+ return mqclient.MQClient()
+}
diff --git a/util/etcd/queue.go b/util/etcd/queue.go
deleted file mode 100644
index 84241760a8..0000000000
--- a/util/etcd/queue.go
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright (C) 2014-2018 Goodrain Co., Ltd.
-// RAINBOND, Application Management Platform
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version. For any non-GPL usage of Rainbond,
-// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
-// must be obtained first.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-package etcd
-
-import (
- "fmt"
-
- v3 "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "golang.org/x/net/context"
-)
-
-// Queue implements a multi-reader, multi-writer distributed queue.
-type Queue struct {
- client *v3.Client
- ctx context.Context
-
- keyPrefix string
-}
-
-// NewQueue new queue
-func NewQueue(ctx context.Context, client *v3.Client, keyPrefix string) *Queue {
- return &Queue{client, ctx, keyPrefix}
-}
-
-// Enqueue en queue
-func (q *Queue) Enqueue(val string) error {
- _, err := newUniqueKV(q.ctx, q.client, q.keyPrefix, val)
- return err
-}
-
-// Dequeue returns Enqueue()'d elements in FIFO order. If the
-// queue is empty, Dequeue blocks until elements are available.
-func (q *Queue) Dequeue() (string, error) {
- for {
- // TODO: fewer round trips by fetching more than one key
- resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
- if err != nil {
- return "", err
- }
-
- kv, err := claimFirstKey(q.ctx, q.client, resp.Kvs)
- if err != nil {
- return "", err
- } else if kv != nil {
- return string(kv.Value), nil
- } else if resp.More {
- // missed some items, retry to read in more
- return q.Dequeue()
- }
-
- // nothing yet; wait on elements
- ev, err := WaitPrefixEvents(
- q.client,
- q.keyPrefix,
- resp.Header.Revision,
- []mvccpb.Event_EventType{mvccpb.PUT})
- if err != nil {
- if err == ErrNoUpdateForLongTime {
- continue
- }
- return "", err
- }
- if ev == nil {
- return "", fmt.Errorf("event is nil")
- }
- if ev.Kv == nil {
- return "", fmt.Errorf("event key value is nil")
- }
- ok, err := deleteRevKey(q.ctx, q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
- if err != nil {
- return "", err
- } else if !ok {
- return q.Dequeue()
- }
- return string(ev.Kv.Value), err
- }
-}
diff --git a/util/etcd/watch.go b/util/etcd/watch.go
deleted file mode 100644
index 9cf108c13a..0000000000
--- a/util/etcd/watch.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Copyright (C) 2014-2018 Goodrain Co., Ltd.
-// RAINBOND, Application Management Platform
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version. For any non-GPL usage of Rainbond,
-// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
-// must be obtained first.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-package etcd
-
-import (
- "fmt"
- "time"
-
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "github.com/sirupsen/logrus"
- "golang.org/x/net/context"
-)
-
-//ErrNoUpdateForLongTime no update for long time , can reobservation of synchronous data
-var ErrNoUpdateForLongTime = fmt.Errorf("not updated for a long time")
-
-//WaitPrefixEvents WaitPrefixEvents
-func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []mvccpb.Event_EventType) (*clientv3.Event, error) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- logrus.Debug("start watch message from etcd queue")
- wc := clientv3.NewWatcher(c).Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
- if wc == nil {
- return nil, ErrNoWatcher
- }
- event := waitEvents(wc, evs)
- if event != nil {
- return event, nil
- }
- logrus.Debug("queue watcher sync, because of not updated for a long time")
- return nil, ErrNoUpdateForLongTime
-}
-
-//waitEvents this will return nil
-func waitEvents(wc clientv3.WatchChan, evs []mvccpb.Event_EventType) *clientv3.Event {
- i := 0
- timer := time.NewTimer(time.Second * 30)
- defer timer.Stop()
- for {
- select {
- case wresp := <-wc:
- if wresp.Err() != nil {
- logrus.Errorf("watch event failure %s", wresp.Err().Error())
- return nil
- }
- if len(wresp.Events) == 0 {
- return nil
- }
- for _, ev := range wresp.Events {
- if ev.Type == evs[i] {
- i++
- if i == len(evs) {
- return ev
- }
- }
- }
- case <-timer.C:
- return nil
- }
- }
-}