Skip to content

Commit

Permalink
perf: remove all etcd
Browse files Browse the repository at this point in the history
Signed-off-by: 逆流而上 <[email protected]>
  • Loading branch information
DokiDoki1103 committed Feb 22, 2024
1 parent fe56601 commit 29497d1
Show file tree
Hide file tree
Showing 14 changed files with 14 additions and 1,497 deletions.
4 changes: 4 additions & 0 deletions builder/exector/exector.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ func (e *exectorManager) runTask(f func(task *pb.TaskMessage), task *pb.TaskMess
e.runningTask.Delete(task.TaskId)
logrus.Infof("Build task %s is completed", task.TaskId)
}

func (e *exectorManager) runTaskWithErr(f func(task *pb.TaskMessage) error, task *pb.TaskMessage, concurrencyControl bool) {
if task.TaskType == "" || task.TaskId == "" {
return
}
logrus.Infof("Build task %s in progress", task.TaskId)
e.runningTask.LoadOrStore(task.TaskId, task)
//Remove a task that is being executed, not necessarily a task that is currently completed
Expand Down
71 changes: 2 additions & 69 deletions cmd/eventlog/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@
package server

import (
"context"
"fmt"
"os"
"os/signal"
"path"
"syscall"
"time"

"github.com/goodrain/rainbond/discover"
"github.com/goodrain/rainbond/eventlog/cluster"
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/eventlog/entry"
"github.com/goodrain/rainbond/eventlog/exit/web"
"github.com/goodrain/rainbond/eventlog/store"
etcdutil "github.com/goodrain/rainbond/util/etcd"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
)
Expand All @@ -45,7 +41,6 @@ type LogServer struct {
Entry *entry.Entry
Logger *logrus.Logger
SocketServer *web.SocketServer
Cluster cluster.Cluster
}

// NewLogServer creates a new NewLogServer.
Expand All @@ -68,16 +63,9 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&s.Conf.Entry.MonitorMessageServer.SubAddress, "monitor.subaddress", []string{"tcp://127.0.0.1:9442"}, "monitor message source address")
fs.IntVar(&s.Conf.Entry.MonitorMessageServer.CacheMessageSize, "monitor.cache", 200, "the monitor sub server cache the receive message size")
fs.StringVar(&s.Conf.Entry.MonitorMessageServer.SubSubscribe, "monitor.subscribe", "ceptop", "the monitor message sub server subscribe info")
fs.BoolVar(&s.Conf.ClusterMode, "cluster", true, "Whether open cluster mode")
fs.StringVar(&s.Conf.Cluster.Discover.InstanceIP, "cluster.instance.ip", "", "The current instance IP in the cluster can be communications.")
fs.StringVar(&s.Conf.Cluster.Discover.Type, "discover.type", "etcd", "the instance in cluster auto discover way.")
fs.StringSliceVar(&s.Conf.Cluster.Discover.EtcdAddr, "discover.etcd.addr", []string{"http://rbd-etcd:2379"}, "set all etcd server addr in cluster for message instence auto discover.")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdCaFile, "discover.etcd.ca", "", "verify etcd certificates of TLS-enabled secure servers using this CA bundle")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdCertFile, "discover.etcd.cert", "", "identify secure etcd client using this TLS certificate file")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdKeyFile, "discover.etcd.key", "", "identify secure etcd client using this TLS key file")
fs.StringVar(&s.Conf.Cluster.Discover.HomePath, "discover.etcd.homepath", "/event", "etcd home key")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdUser, "discover.etcd.user", "", "etcd server user info")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdPass, "discover.etcd.pass", "", "etcd server user password")
fs.StringVar(&s.Conf.Cluster.PubSub.PubBindIP, "cluster.bind.ip", "0.0.0.0", "Cluster communication to listen the IP")
fs.IntVar(&s.Conf.Cluster.PubSub.PubBindPort, "cluster.bind.port", 6365, "Cluster communication to listen the Port")
fs.StringVar(&s.Conf.EventStore.MessageType, "message.type", "json", "Receive and transmit the log message type.")
Expand Down Expand Up @@ -157,9 +145,6 @@ func (s *LogServer) InitLog() {

// InitConf 初始化配置
func (s *LogServer) InitConf() {
s.Conf.Cluster.Discover.ClusterMode = s.Conf.ClusterMode
s.Conf.Cluster.PubSub.ClusterMode = s.Conf.ClusterMode
s.Conf.EventStore.ClusterMode = s.Conf.ClusterMode
s.Conf.Cluster.Discover.DockerLogPort = s.Conf.Entry.DockerLogServer.BindPort
s.Conf.Cluster.Discover.WebPort = s.Conf.WebSocket.BindPort
if os.Getenv("MYSQL_HOST") != "" && os.Getenv("MYSQL_USER") != "" && os.Getenv("MYSQL_PASSWORD") != "" {
Expand All @@ -176,21 +161,6 @@ func (s *LogServer) Run() error {
s.Logger.Debug("Start run server.")
log := s.Logger

etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: s.Conf.Cluster.Discover.EtcdAddr,
CaFile: s.Conf.Cluster.Discover.EtcdCaFile,
CertFile: s.Conf.Cluster.Discover.EtcdCertFile,
KeyFile: s.Conf.Cluster.Discover.EtcdKeyFile,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

etcdClient, err := etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {
return err
}

//init new db
if err := db.CreateDBManager(s.Conf.EventStore.DB); err != nil {
logrus.Infof("create db manager error, %v", err)
Expand All @@ -206,15 +176,9 @@ func (s *LogServer) Run() error {
return err
}
defer storeManager.Stop()
if s.Conf.ClusterMode {
s.Cluster = cluster.NewCluster(etcdClient, s.Conf.Cluster, log.WithField("module", "Cluster"), storeManager)
if err := s.Cluster.Start(); err != nil {
return err
}
defer s.Cluster.Stop()
}

s.SocketServer = web.NewSocket(s.Conf.WebSocket, s.Conf.Cluster.Discover,
log.WithField("module", "SocketServer"), storeManager, s.Cluster, healthInfo)
log.WithField("module", "SocketServer"), storeManager, healthInfo)
if err := s.SocketServer.Run(); err != nil {
return err
}
Expand All @@ -226,37 +190,6 @@ func (s *LogServer) Run() error {
}
defer s.Entry.Stop()

//服务注册
grpckeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "event_log_event_grpc",
s.Conf.Cluster.Discover.NodeID, s.Conf.Cluster.Discover.InstanceIP, s.Conf.Entry.EventLogServer.BindPort)
if err != nil {
return err
}
if err := grpckeepalive.Start(); err != nil {
return err
}
defer grpckeepalive.Stop()

udpkeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "event_log_event_udp",
s.Conf.Cluster.Discover.NodeID, s.Conf.Cluster.Discover.InstanceIP, s.Conf.Entry.NewMonitorMessageServerConf.ListenerPort)
if err != nil {
return err
}
if err := udpkeepalive.Start(); err != nil {
return err
}
defer udpkeepalive.Stop()

httpkeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "event_log_event_http",
s.Conf.Cluster.Discover.NodeID, s.Conf.Cluster.Discover.InstanceIP, s.Conf.WebSocket.BindPort)
if err != nil {
return err
}
if err := httpkeepalive.Start(); err != nil {
return err
}
defer httpkeepalive.Stop()

term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
select {
Expand Down
158 changes: 0 additions & 158 deletions eventlog/cluster/cluster.go

This file was deleted.

Loading

0 comments on commit 29497d1

Please sign in to comment.