diff --git a/cmd/commands/run.go b/cmd/commands/run.go index 4af82e9..e528f0d 100644 --- a/cmd/commands/run.go +++ b/cmd/commands/run.go @@ -2,13 +2,13 @@ package commands import ( "errors" - "log" "os" "os/signal" "syscall" "github.com/dezh-tech/immortal/cmd/relay" "github.com/dezh-tech/immortal/config" + "github.com/dezh-tech/immortal/pkg/logger" ) func HandleRun(args []string) { @@ -16,13 +16,13 @@ func HandleRun(args []string) { ExitOnError(errors.New("at least 1 arguments expected\nuse help command for more information")) } - log.Println("loading config...") - cfg, err := config.Load(args[2]) if err != nil { ExitOnError(err) } + logger.InitGlobalLogger(&cfg.Logger) + r, err := relay.New(cfg) if err != nil { ExitOnError(err) @@ -35,13 +35,13 @@ func HandleRun(args []string) { select { case sig := <-sigChan: - log.Printf("Received signal: %s\nInitiating graceful shutdown...\n", sig.String()) //nolint + logger.Info("Received signal: Initiating graceful shutdown", "signal", sig.String()) if err := r.Stop(); err != nil { ExitOnError(err) } case err := <-errCh: - log.Printf("Unexpected error: %v\nInitiating shutdown due to the error...\n", err) //nolint + logger.Info("Unexpected error: Initiating shutdown due to the error", "err", err) if err := r.Stop(); err != nil { ExitOnError(err) } diff --git a/cmd/commands/utils.go b/cmd/commands/utils.go index b974b1d..69452d1 100644 --- a/cmd/commands/utils.go +++ b/cmd/commands/utils.go @@ -1,11 +1,12 @@ package commands import ( - "log" "os" + + "github.com/dezh-tech/immortal/pkg/logger" ) func ExitOnError(err error) { - log.Printf("immortal error: %s\n", err.Error()) //nolint + logger.Error("immortal error", "err", err.Error()) os.Exit(1) } diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index b735223..ccfcfae 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -3,7 +3,6 @@ package relay import ( "context" "fmt" - "log" "time" "github.com/dezh-tech/immortal/config" @@ -13,6 +12,7 @@ import ( grpcclient "github.com/dezh-tech/immortal/infrastructure/grpc_client" "github.com/dezh-tech/immortal/infrastructure/metrics" "github.com/dezh-tech/immortal/infrastructure/redis" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/repository" ) @@ -39,13 +39,13 @@ func New(cfg *config.Config) (*Relay, error) { return nil, err } - c, err := grpcclient.New(cfg.GRPCClient.Endpoint) + c, err := grpcclient.New(cfg.GRPCClient.Endpoint, cfg.GRPCClient) if err != nil { return nil, err } resp, err := c.RegisterService(context.Background(), fmt.Sprint(cfg.GRPCServer.Port), - cfg.GRPCClient.Region, cfg.GRPCClient.Heartbeat) + cfg.GRPCClient.Region) if err != nil { return nil, err } @@ -54,7 +54,9 @@ func New(cfg *config.Config) (*Relay, error) { return nil, fmt.Errorf("cant register to master: %s", *resp.Message) } - params, err := c.GetParameters(context.Background(), resp.Token) + c.SetID(resp.Token) + + params, err := c.GetParameters(context.Background()) if err != nil { return nil, err } @@ -64,9 +66,9 @@ func New(cfg *config.Config) (*Relay, error) { return nil, err } - h := repository.New(db, cfg.Handler) + h := repository.New(cfg.Handler, db, c) - ws, err := websocket.New(cfg.WebsocketServer, h, m, r) + ws, err := websocket.New(cfg.WebsocketServer, h, m, r, c) if err != nil { return nil, err } @@ -84,7 +86,8 @@ func New(cfg *config.Config) (*Relay, error) { // Start runs the relay and its children. func (r *Relay) Start() chan error { - log.Println("relay started successfully...") + logger.Info("starting the relay") + errCh := make(chan error, 2) go func() { @@ -104,7 +107,7 @@ func (r *Relay) Start() chan error { // Stop shutdowns the relay and its children gracefully. func (r *Relay) Stop() error { - log.Println("stopping relay...") + logger.Info("stopping the relay") if err := r.websocketServer.Stop(); err != nil { return err @@ -114,5 +117,9 @@ func (r *Relay) Stop() error { return err } - return r.database.Stop() + if err := r.database.Stop(); err != nil { + return err + } + + return nil } diff --git a/config/config.go b/config/config.go index 008a143..90141c3 100644 --- a/config/config.go +++ b/config/config.go @@ -8,6 +8,7 @@ import ( "github.com/dezh-tech/immortal/infrastructure/database" grpcclient "github.com/dezh-tech/immortal/infrastructure/grpc_client" "github.com/dezh-tech/immortal/infrastructure/redis" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/repository" "github.com/joho/godotenv" "gopkg.in/yaml.v3" @@ -21,6 +22,7 @@ type Config struct { Database database.Config `yaml:"database"` RedisConf redis.Config `yaml:"redis"` GRPCServer grpc.Config `yaml:"grpc_server"` + Logger logger.Config `yaml:"logger"` Handler repository.Config } diff --git a/config/config.yml b/config/config.yml index 242f5e2..8296ea9 100644 --- a/config/config.yml +++ b/config/config.yml @@ -42,6 +42,10 @@ manager: # default is global. region: global + # stack is the stack name which the immortal instance is located in. + # default is immortal. + stack: immortal + # database contains details of database connections and limitations. database: # db_name is the name of mongodb related to immortal @@ -77,3 +81,25 @@ redis: # white_list_filter_name specifies the name of whitelist cuckoo filter key # default is IMMO_WHITE_LIST. white_list_filter_name: IMMO_WHITE_LIST + + +# log contains configs for logs output in console and file. +logger: + # level determine the log level. + level: "info" + + # filename is the output file containing logs. + filename: "immortal.log" + + # max_size_in_mb is the maximum size of log file. + max_size_in_mb: 10 + + # max_backups determines the maximum number of backup of log file size. + max_backups: 10 + + # compress determines to compress the log files or not. + compress: true + + # targets is targets for logs to be written to. + targets: [file, console] + \ No newline at end of file diff --git a/config/parameters.go b/config/parameters.go index fd678d6..e59b76e 100644 --- a/config/parameters.go +++ b/config/parameters.go @@ -3,8 +3,8 @@ package config import ( "github.com/dezh-tech/immortal/delivery/websocket" mpb "github.com/dezh-tech/immortal/infrastructure/grpc_client/gen" + "github.com/dezh-tech/immortal/pkg/utils" "github.com/dezh-tech/immortal/repository" - "github.com/dezh-tech/immortal/utils" ) type Parameters struct { diff --git a/delivery/grpc/server.go b/delivery/grpc/server.go index 42b83ff..de31b09 100644 --- a/delivery/grpc/server.go +++ b/delivery/grpc/server.go @@ -2,7 +2,6 @@ package grpc import ( "context" - "log" "net" "strconv" "time" @@ -10,6 +9,7 @@ import ( rpb "github.com/dezh-tech/immortal/delivery/grpc/gen" "github.com/dezh-tech/immortal/infrastructure/database" "github.com/dezh-tech/immortal/infrastructure/redis" + "github.com/dezh-tech/immortal/pkg/logger" "google.golang.org/grpc" ) @@ -44,8 +44,6 @@ func (s *Server) Start() error { return err } - log.Println("grpc server started...") - grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor()) healthServer := newHealthServer(s) @@ -55,18 +53,20 @@ func (s *Server) Start() error { s.listener = listener s.grpc = grpcServer - return s.grpc.Serve(listener) + logger.Info("gRPC server started successfully", "listen", listener.Addr().String()) + + if err := s.grpc.Serve(listener); err != nil { + return err + } + + return nil } func (s *Server) Stop() error { - s.cancel() - - log.Println("grpc server stopped...") + logger.Info("stopping gRPC server") + s.cancel() s.grpc.Stop() - if err := s.listener.Close(); err != nil { - return err - } return nil } diff --git a/delivery/websocket/auth_handler.go b/delivery/websocket/auth_handler.go index b709faf..cfbb2f0 100644 --- a/delivery/websocket/auth_handler.go +++ b/delivery/websocket/auth_handler.go @@ -3,9 +3,9 @@ package websocket import ( "fmt" + "github.com/dezh-tech/immortal/pkg/utils" "github.com/dezh-tech/immortal/types" "github.com/dezh-tech/immortal/types/message" - "github.com/dezh-tech/immortal/utils" "github.com/gorilla/websocket" ) diff --git a/delivery/websocket/event_handler.go b/delivery/websocket/event_handler.go index d376cae..c23d16f 100644 --- a/delivery/websocket/event_handler.go +++ b/delivery/websocket/event_handler.go @@ -7,8 +7,8 @@ import ( "strconv" "time" + "github.com/dezh-tech/immortal/pkg/utils" "github.com/dezh-tech/immortal/types/message" - "github.com/dezh-tech/immortal/utils" "github.com/gorilla/websocket" ) diff --git a/delivery/websocket/req_handler.go b/delivery/websocket/req_handler.go index 7944309..a9f1c71 100644 --- a/delivery/websocket/req_handler.go +++ b/delivery/websocket/req_handler.go @@ -3,8 +3,8 @@ package websocket import ( "fmt" + "github.com/dezh-tech/immortal/pkg/utils" "github.com/dezh-tech/immortal/types/message" - "github.com/dezh-tech/immortal/utils" "github.com/gorilla/websocket" ) diff --git a/delivery/websocket/server.go b/delivery/websocket/server.go index 68db8d9..388f782 100644 --- a/delivery/websocket/server.go +++ b/delivery/websocket/server.go @@ -2,14 +2,15 @@ package websocket import ( "fmt" - "log" "net" "net/http" "strconv" "sync" + grpcclient "github.com/dezh-tech/immortal/infrastructure/grpc_client" "github.com/dezh-tech/immortal/infrastructure/metrics" "github.com/dezh-tech/immortal/infrastructure/redis" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/repository" "github.com/dezh-tech/immortal/types/filter" "github.com/dezh-tech/immortal/types/message" @@ -29,9 +30,11 @@ type Server struct { handler *repository.Handler metrics *metrics.Metrics redis *redis.Redis + grpc grpcclient.IClient } -func New(cfg Config, h *repository.Handler, m *metrics.Metrics, r *redis.Redis, +func New(cfg Config, h *repository.Handler, m *metrics.Metrics, + r *redis.Redis, grpc grpcclient.IClient, ) (*Server, error) { return &Server{ config: cfg, @@ -40,18 +43,21 @@ func New(cfg Config, h *repository.Handler, m *metrics.Metrics, r *redis.Redis, handler: h, metrics: m, redis: r, + grpc: grpc, }, nil } // Start starts a new server instance. func (s *Server) Start() error { - log.Println("websocket server started successfully...") - go s.checkExpiration() + addr := net.JoinHostPort(s.config.Bind, + strconv.Itoa(int(s.config.Port))) + + logger.Info("websocket server started", "listen", addr) + http.Handle("/", s) - err := http.ListenAndServe(net.JoinHostPort(s.config.Bind, //nolint - strconv.Itoa(int(s.config.Port))), nil) + err := http.ListenAndServe(addr, nil) //nolint return err } @@ -65,7 +71,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mu.Lock() - log.Println("new websocket connection: ", conn.RemoteAddr().String()) + logger.Debug("incoming websocket connection", + "addr", conn.RemoteAddr().String()) + s.metrics.Connections.Inc() known := false @@ -88,6 +96,9 @@ func (s *Server) readLoop(conn *websocket.Conn) { for { _, buf, err := conn.ReadMessage() if err != nil { + logger.Debug("failed to read form connection", "conn", + conn.RemoteAddr().String(), "err", err.Error()) + // clean up closed connection. s.mu.Lock() @@ -116,6 +127,9 @@ func (s *Server) readLoop(conn *websocket.Conn) { s.metrics.MessagesTotal.Inc() + logger.Debug("incoming message", "conn", + conn.RemoteAddr().String(), "msg", msg.String()) + switch msg.Type() { case "REQ": go s.handleReq(conn, msg) @@ -137,7 +151,7 @@ func (s *Server) Stop() error { s.mu.Lock() defer s.mu.Unlock() - log.Println("stopping websocket server...") + logger.Info("stopping websocket server") for wsConn, client := range s.conns { client.Lock() diff --git a/delivery/websocket/task_scheduler.go b/delivery/websocket/task_scheduler.go index 8d2e082..40976f0 100644 --- a/delivery/websocket/task_scheduler.go +++ b/delivery/websocket/task_scheduler.go @@ -1,19 +1,28 @@ package websocket import ( + "context" + "fmt" "strconv" "strings" "time" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" ) const expirationTaskListName = "expiration_events" -func (s *Server) checkExpiration() { +func (s *Server) checkExpiration() { //nolint for range time.Tick(time.Minute) { tasks, err := s.redis.GetReadyTasks(expirationTaskListName) if err != nil { + _, err := s.grpc.AddLog(context.Background(), + fmt.Sprintf("redis error while receiving ready tasks: %v", err)) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } + continue } diff --git a/go.mod b/go.mod index bb62522..2887f8d 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,14 @@ require ( github.com/mailru/easyjson v0.7.7 github.com/prometheus/client_golang v1.20.5 github.com/redis/go-redis/v9 v9.7.0 + github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.18.0 go.mongodb.org/mongo-driver v1.17.1 golang.org/x/exp v0.0.0-20241210194714-1829a127f884 google.golang.org/grpc v1.69.0 google.golang.org/protobuf v1.35.2 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -30,6 +32,8 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/kr/text v0.2.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 0d82496..287f629 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,7 @@ github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6 github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -23,6 +24,7 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -45,10 +47,16 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= @@ -63,6 +71,9 @@ github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= @@ -114,6 +125,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -137,5 +151,7 @@ google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/infrastructure/database/database.go b/infrastructure/database/database.go index 5d2a8e0..0d4ccd3 100644 --- a/infrastructure/database/database.go +++ b/infrastructure/database/database.go @@ -2,9 +2,9 @@ package database import ( "context" - "log" "time" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -18,6 +18,8 @@ type Database struct { } func Connect(cfg Config) (*Database, error) { + logger.Info("connecting to database") + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.ConnectionTimeout)*time.Millisecond) defer cancel() @@ -55,6 +57,9 @@ func Connect(cfg Config) (*Database, error) { if err != nil { cancel() + // todo::: skip `Index already exists with a different name` errors. + logger.Error("can't create index for id field", "err", err) + continue } cancel() @@ -68,7 +73,11 @@ func Connect(cfg Config) (*Database, error) { } func (db *Database) Stop() error { - log.Println("closing database connection...") + logger.Info("closing database connection") + + if err := db.Client.Disconnect(context.Background()); err != nil { + return err + } - return db.Client.Disconnect(context.Background()) + return nil } diff --git a/infrastructure/grpc_client/client.go b/infrastructure/grpc_client/client.go index 573ee32..e73e85c 100644 --- a/infrastructure/grpc_client/client.go +++ b/infrastructure/grpc_client/client.go @@ -12,10 +12,13 @@ import ( type Client struct { RegistryService mpb.ServiceRegistryClient ParametersService mpb.ParametersClient + LogService mpb.LogClient + id string + config Config conn *grpc.ClientConn } -func New(endpoint string) (*Client, error) { +func New(endpoint string, cfg Config) (IClient, error) { conn, err := grpc.NewClient(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err @@ -24,24 +27,37 @@ func New(endpoint string) (*Client, error) { return &Client{ RegistryService: mpb.NewServiceRegistryClient(conn), ParametersService: mpb.NewParametersClient(conn), + LogService: mpb.NewLogClient(conn), + config: cfg, conn: conn, }, nil } +func (c *Client) SetID(id string) { + c.id = id +} + func (c *Client) RegisterService(ctx context.Context, - port, region string, hb uint32, + port, region string, ) (*mpb.RegisterServiceResponse, error) { return c.RegistryService.RegisterService(ctx, &mpb.RegisterServiceRequest{ Type: mpb.ServiceTypeEnum_RELAY, Port: port, - HeartbeatDurationInSec: hb, + HeartbeatDurationInSec: c.config.Heartbeat, Region: region, }) } -func (c *Client) GetParameters(ctx context.Context, id string) (*mpb.GetParametersResponse, error) { - md := metadata.New(map[string]string{"x-identifier": id}) +func (c *Client) GetParameters(ctx context.Context) (*mpb.GetParametersResponse, error) { + md := metadata.New(map[string]string{"x-identifier": c.id}) ctx = metadata.NewOutgoingContext(ctx, md) return c.ParametersService.GetParameters(ctx, &mpb.GetParametersRequest{}) } + +func (c *Client) AddLog(ctx context.Context, msg string) (*mpb.AddLogResponse, error) { + return c.LogService.AddLog(ctx, &mpb.AddLogRequest{ + Message: msg, + Stack: c.config.Stack, + }) +} diff --git a/infrastructure/grpc_client/config.go b/infrastructure/grpc_client/config.go index 2aba0f1..8782f54 100644 --- a/infrastructure/grpc_client/config.go +++ b/infrastructure/grpc_client/config.go @@ -4,4 +4,5 @@ type Config struct { Endpoint string `yaml:"endpoint"` Region string `yaml:"region"` Heartbeat uint32 `yaml:"heartbeat_in_second"` + Stack string `yaml:"stack"` } diff --git a/infrastructure/grpc_client/interface.go b/infrastructure/grpc_client/interface.go new file mode 100644 index 0000000..d2e6c99 --- /dev/null +++ b/infrastructure/grpc_client/interface.go @@ -0,0 +1,17 @@ +package grpcclient + +import ( + "context" + + mpb "github.com/dezh-tech/immortal/infrastructure/grpc_client/gen" +) + +type IClient interface { + RegisterService(ctx context.Context, + port, region string, + ) (*mpb.RegisterServiceResponse, error) + GetParameters(ctx context.Context) (*mpb.GetParametersResponse, error) + AddLog(ctx context.Context, msg string) (*mpb.AddLogResponse, error) + + SetID(id string) +} diff --git a/infrastructure/redis/redis.go b/infrastructure/redis/redis.go index 3045a5a..cba9bcc 100644 --- a/infrastructure/redis/redis.go +++ b/infrastructure/redis/redis.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/redis/go-redis/v9" ) @@ -18,6 +19,8 @@ type Redis struct { } func New(cfg Config) (*Redis, error) { + logger.Info("connecting to redis") + opts, err := redis.ParseURL(cfg.URI) if err != nil { return nil, err @@ -43,6 +46,12 @@ func New(cfg Config) (*Redis, error) { }, nil } +func (r *Redis) Close() error { + logger.Info("closing redis connection") + + return r.Client.Close() +} + // ! note: delayed tasks probably are not concurrent safe at the moment. func (r *Redis) AddDelayedTask(listName string, data string, delay time.Duration, diff --git a/pkg/logger/config.go b/pkg/logger/config.go new file mode 100644 index 0000000..5363a77 --- /dev/null +++ b/pkg/logger/config.go @@ -0,0 +1,10 @@ +package logger + +type Config struct { + Filename string `yaml:"filename"` + LogLevel string `yaml:"level"` + Targets []string `yaml:"targets"` + MaxSize int `yaml:"max_size_in_mb"` + MaxBackups int `yaml:"max_backups"` + Compress bool `yaml:"compress"` +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go new file mode 100644 index 0000000..256b61e --- /dev/null +++ b/pkg/logger/logger.go @@ -0,0 +1,122 @@ +package logger + +import ( + "encoding/hex" + "fmt" + "io" + "os" + "reflect" + "slices" + "strings" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "gopkg.in/natefinch/lumberjack.v2" +) + +var globalInst *logger + +type logger struct { + writer io.Writer +} + +func InitGlobalLogger(cfg *Config) { + writers := []io.Writer{} + + if slices.Contains(cfg.Targets, "file") { + fileWriter := &lumberjack.Logger{ + Filename: cfg.Filename, + MaxSize: cfg.MaxSize, + MaxBackups: cfg.MaxBackups, + Compress: cfg.Compress, + } + writers = append(writers, fileWriter) + } + + if slices.Contains(cfg.Targets, "console") { + writers = append(writers, zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05"}) + } + + globalInst = &logger{ + writer: io.MultiWriter(writers...), + } + + level, err := zerolog.ParseLevel(strings.ToLower(cfg.LogLevel)) + if err != nil { + level = zerolog.InfoLevel + } + zerolog.SetGlobalLevel(level) + + log.Logger = zerolog.New(globalInst.writer).With().Timestamp().Logger() +} + +func addFields(event *zerolog.Event, keyvals ...any) *zerolog.Event { + if len(keyvals)%2 != 0 { + keyvals = append(keyvals, "!MISSING-VALUE!") + } + + for i := 0; i < len(keyvals); i += 2 { + key, ok := keyvals[i].(string) + if !ok { + key = "!INVALID-KEY!" + } + + value := keyvals[i+1] + switch typ := value.(type) { + case fmt.Stringer: + if isNil(typ) { + event.Any(key, typ) + } else { + event.Stringer(key, typ) + } + case error: + event.AnErr(key, typ) + case []byte: + event.Str(key, hex.EncodeToString(typ)) + default: + event.Any(key, typ) + } + } + + return event +} + +func Trace(msg string, keyvals ...any) { + addFields(log.Trace(), keyvals...).Msg(msg) +} + +func Debug(msg string, keyvals ...any) { + addFields(log.Debug(), keyvals...).Msg(msg) +} + +func Info(msg string, keyvals ...any) { + addFields(log.Info(), keyvals...).Msg(msg) +} + +func Warn(msg string, keyvals ...any) { + addFields(log.Warn(), keyvals...).Msg(msg) +} + +func Error(msg string, keyvals ...any) { + addFields(log.Error(), keyvals...).Msg(msg) +} + +func Fatal(msg string, keyvals ...any) { + addFields(log.Fatal(), keyvals...).Msg(msg) +} + +func Panic(msg string, keyvals ...any) { + addFields(log.Panic(), keyvals...).Msg(msg) +} + +func isNil(i any) bool { + if i == nil { + return true + } + + if reflect.TypeOf(i).Kind() == reflect.Ptr { + return reflect.ValueOf(i).IsNil() + } + + return false +} diff --git a/utils/random.go b/pkg/utils/random.go similarity index 100% rename from utils/random.go rename to pkg/utils/random.go diff --git a/utils/url.go b/pkg/utils/url.go similarity index 100% rename from utils/url.go rename to pkg/utils/url.go diff --git a/repository/delete.go b/repository/delete.go index 6e09811..03f5035 100644 --- a/repository/delete.go +++ b/repository/delete.go @@ -2,7 +2,9 @@ package repository import ( "context" + "fmt" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" "go.mongodb.org/mongo-driver/bson" ) @@ -33,6 +35,12 @@ func (h *Handler) DeleteByID(id string, kind types.Kind) error { _, err := coll.UpdateOne(ctx, filter, update) if err != nil { + _, err := h.grpc.AddLog(context.Background(), + fmt.Sprintf("database error while removing event: %v", err)) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } + return err } diff --git a/repository/event.go b/repository/event.go index b0dd0a4..b677f00 100644 --- a/repository/event.go +++ b/repository/event.go @@ -3,7 +3,9 @@ package repository import ( "context" "errors" + "fmt" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types/event" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" @@ -97,6 +99,12 @@ func (h *Handler) HandleEvent(e *event.Event) error { opts := options.Replace().SetUpsert(true) _, err := coll.ReplaceOne(ctx, filter, e, opts) if err != nil { + _, err := h.grpc.AddLog(context.Background(), + fmt.Sprintf("database error while adding new event: %v", err)) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } + return err } diff --git a/repository/handler.go b/repository/handler.go index 372c5e6..879618b 100644 --- a/repository/handler.go +++ b/repository/handler.go @@ -2,9 +2,24 @@ package repository import ( "github.com/dezh-tech/immortal/infrastructure/database" + grpcclient "github.com/dezh-tech/immortal/infrastructure/grpc_client" "github.com/dezh-tech/immortal/types" ) +type Handler struct { + db *database.Database + grpc grpcclient.IClient + config Config +} + +func New(cfg Config, db *database.Database, grpc grpcclient.IClient) *Handler { + return &Handler{ + db: db, + config: cfg, + grpc: grpc, + } +} + func getCollectionName(k types.Kind) string { collName, ok := types.KindToName[k] if ok { @@ -29,15 +44,3 @@ func getCollectionName(k types.Kind) string { return "unknown" } - -type Handler struct { - db *database.Database - config Config -} - -func New(db *database.Database, cfg Config) *Handler { - return &Handler{ - db: db, - config: cfg, - } -} diff --git a/repository/req.go b/repository/req.go index c0722bc..b6b0c32 100644 --- a/repository/req.go +++ b/repository/req.go @@ -2,11 +2,15 @@ package repository import ( "context" + "errors" + "fmt" + "github.com/dezh-tech/immortal/pkg/logger" "github.com/dezh-tech/immortal/types" "github.com/dezh-tech/immortal/types/event" "github.com/dezh-tech/immortal/types/filter" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -71,6 +75,14 @@ func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) { cursor, err := collection.Find(ctx, query, opts) if err != nil { + if !errors.Is(err, mongo.ErrNoDocuments) { + _, err := h.grpc.AddLog(context.Background(), + fmt.Sprintf("database error while making query: %v", err)) + if err != nil { + logger.Error("can't send log to manager", "err", err) + } + } + return nil, err } diff --git a/types/message/message.go b/types/message/message.go index ddf61c4..e78a7c5 100644 --- a/types/message/message.go +++ b/types/message/message.go @@ -14,7 +14,7 @@ import ( "github.com/tidwall/gjson" // TODO::: remove/replace me! ) -// Message reperesents an NIP-01 message which can be sent to or received by client. +// Message represents an NIP-01 message which can be sent to or received by client. type Message interface { Type() string DecodeFromJSON([]byte) error @@ -158,7 +158,7 @@ func (rm Req) EncodeToJSON() ([]byte, error) { return nil, nil } -// Notice reperesents a NIP-01 NOTICE message. +// Notice represents a NIP-01 NOTICE message. type Notice string func MakeNotice(msg string) []byte { @@ -200,7 +200,7 @@ func (nm Notice) EncodeToJSON() ([]byte, error) { return res, nil } -// EOSE reperesents a NIP-01 EOSE message. +// EOSE represents a NIP-01 EOSE message. type EOSE string func MakeEOSE(sID string) []byte { @@ -242,7 +242,7 @@ func (em EOSE) EncodeToJSON() ([]byte, error) { return res, nil } -// Close reperesents a NIP-01 CLOSE message. +// Close represents a NIP-01 CLOSE message. type Close string func (Close) Type() string { return "CLOSE" } @@ -282,7 +282,7 @@ func (cm Close) EncodeToJSON() ([]byte, error) { return res, nil } -// Closed reperesents a NIP-01 CLOSED message. +// Closed represents a NIP-01 CLOSED message. type Closed struct { SubscriptionID string Reason string @@ -335,7 +335,7 @@ func (cm Closed) EncodeToJSON() ([]byte, error) { return res, nil } -// OK reperesents a NIP-01 OK message. +// OK represents a NIP-01 OK message. type OK struct { OK bool EventID string @@ -394,7 +394,7 @@ func (om OK) EncodeToJSON() ([]byte, error) { return res, nil } -// Auth reperesents a NIP-01 AUTH message. +// Auth represents a NIP-01 AUTH message. type Auth struct { Challenge string Event event.Event