Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: minor fixes #519

Merged
merged 7 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions ssh_toolkit/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ func ExecCommandOverSSH(cmd string,
// fetch ssh client
sshRecord, err := getSSHClient(host, port, user, privateKey)
if err != nil {
if isErrorWhenSSHClientNeedToBeRecreated(err) {
DeleteSSHClient(host)
}
return err
}
// create session
session, err := getSSHSessionWithTimeout(sshRecord, sessionTimeoutSeconds)
if err != nil {
if isErrorWhenSSHClientNeedToBeRecreated(err) {
deleteSSHClient(host)
DeleteSSHClient(host)
}
return err
}
Expand All @@ -44,7 +47,18 @@ func ExecCommandOverSSH(cmd string,
session.Stdout = stdoutBuf
session.Stderr = stderrBuf
// run command
return session.Run(cmd)
err = session.Run(cmd)
if err != nil {
if isErrorWhenSSHClientNeedToBeRecreated(err) {
DeleteSSHClient(host)
}
if isErrorWhenSSHClientNeedToBeRecreated(errors.New(stderrBuf.String())) {
DeleteSSHClient(host)
return fmt.Errorf("%s - %s", err, stderrBuf.String())
}
return err
}
return nil
}

// private functions
Expand Down
3 changes: 3 additions & 0 deletions ssh_toolkit/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ var errorsWhenSSHClientNeedToBeRecreated = []string{
"failed to dial ssh",
"handshake failed",
"unable to authenticate",
"connection refused",
"use of closed network connection",
"rejected: too many authentication failures",
"rejected: connection closed by remote host",
"rejected: connect failed",
"open failed",
"handshake failed",
"EOF",
}

func isErrorWhenSSHClientNeedToBeRecreated(err error) bool {
Expand Down
5 changes: 4 additions & 1 deletion ssh_toolkit/net_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ func NetConnOverSSH(
// fetch ssh client
sshRecord, err := getSSHClient(host, port, user, privateKey)
if err != nil {
if isErrorWhenSSHClientNeedToBeRecreated(err) {
DeleteSSHClient(host)
}
return nil, err
}
// create net connection
conn, err := dialWithTimeout(sshRecord, network, address, time.Duration(netTimeoutSeconds)*time.Second)
if err != nil && isErrorWhenSSHClientNeedToBeRecreated(err) {
deleteSSHClient(host)
DeleteSSHClient(host)
}
return conn, err
}
Expand Down
6 changes: 3 additions & 3 deletions ssh_toolkit/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newSSHClient(host string, port int, user string, privateKey string, timeout
signer, err := ssh.ParsePrivateKey([]byte(privateKey))
if err != nil {
sshClientRecord.mutex.Unlock()
deleteSSHClient(host)
DeleteSSHClient(host)
return nil, err
}
config := &ssh.ClientConfig{
Expand All @@ -82,15 +82,15 @@ func newSSHClient(host string, port int, user string, privateKey string, timeout
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", host, port), config)
if err != nil {
sshClientRecord.mutex.Unlock()
deleteSSHClient(host)
DeleteSSHClient(host)
return nil, err
}
sshClientRecord.client = client
sshClientRecord.mutex.Unlock()
return client, nil
}

func deleteSSHClient(host string) {
func DeleteSSHClient(host string) {
sshClientPool.mutex.Lock()
clientEntry, ok := sshClientPool.clients[host]
if ok {
Expand Down
4 changes: 4 additions & 0 deletions swiftwave_service/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ var RequiredServerDependencies = []string{
"git",
"tar",
"nfs",
"cifs",
"docker",
}

Expand All @@ -224,6 +225,7 @@ var DependencyCheckCommands = map[string]string{
"git": "which git",
"tar": "which tar",
"nfs": "which nfsstat",
"cifs": "which mount.cifs",
"docker": "which docker",
}

Expand All @@ -234,6 +236,7 @@ var DebianDependenciesInstallCommands = map[string]string{
"git": "apt install -y git",
"tar": "apt install -y tar",
"nfs": "apt install -y nfs-common",
"cifs": "apt install -y cifs-utils",
"docker": "curl -fsSL get.docker.com | sh -",
}
var FedoraDependenciesInstallCommands = map[string]string{
Expand All @@ -243,6 +246,7 @@ var FedoraDependenciesInstallCommands = map[string]string{
"git": "dnf install -y git",
"tar": "dnf install -y tar",
"nfs": "dnf install -y nfs-utils",
"cifs": "dnf install -y cifs-utils",
"docker": "curl -fsSL get.docker.com | sh -",
}

Expand Down
6 changes: 5 additions & 1 deletion swiftwave_service/cronjob/server_status_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (m Manager) monitorServerStatus() {
continue
}
go func(server core.Server) {
if server.Status == core.ServerOffline {
ssh_toolkit.DeleteSSHClient(server.HostName)
}
if m.isServerOnline(server) {
err = core.MarkServerAsOnline(&m.ServiceManager.DbClient, &server)
if err != nil {
Expand All @@ -60,7 +63,8 @@ func (m Manager) isServerOnline(server core.Server) bool {
for i := 0; i < 3; i++ {
cmd := "echo ok"
stdoutBuf := new(bytes.Buffer)
err := ssh_toolkit.ExecCommandOverSSH(cmd, stdoutBuf, nil, 3, server.IP, server.SSHPort, server.User, m.Config.SystemConfig.SshPrivateKey)
stderrBuf := new(bytes.Buffer)
err := ssh_toolkit.ExecCommandOverSSH(cmd, stdoutBuf, stderrBuf, 3, server.IP, server.SSHPort, server.User, m.Config.SystemConfig.SshPrivateKey)
if err != nil {
continue
}
Expand Down
124 changes: 124 additions & 0 deletions swiftwave_service/db/custom_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package db

import (
"context"
"errors"
"fmt"
"github.com/swiftwave-org/swiftwave/swiftwave_service/logger"
gormlogger "gorm.io/gorm/logger"
"path/filepath"
"runtime"
"strconv"
"time"
)

// newCustomLogger initialize logger
func newCustomLogger(writer gormlogger.Writer, config gormlogger.Config) gormlogger.Interface {
var (
infoStr = "%s: [info] "
warnStr = "%s: [warn] "
errStr = "%s: [error] "
traceStr = "%s: [trace] [%.3fms] [rows:%v] %s"
traceWarnStr = "%s: [trace] %s\n[%.3fms] [rows:%v] %s"
traceErrStr = "%s: [trace] %s\n[%.3fms] [rows:%v] %s"
)

if config.Colorful {
logger.DatabaseLogger.Printf("ignoring colorful logger")
}

return &customLogger{
Writer: writer,
Config: config,
infoStr: infoStr,
warnStr: warnStr,
errStr: errStr,
traceStr: traceStr,
traceWarnStr: traceWarnStr,
traceErrStr: traceErrStr,
}
}

type customLogger struct {
gormlogger.Writer
gormlogger.Config
infoStr, warnStr, errStr string
traceStr, traceErrStr, traceWarnStr string
}

// LogMode log mode
func (l *customLogger) LogMode(level gormlogger.LogLevel) gormlogger.Interface {
customLogger := *l
customLogger.LogLevel = level
return &customLogger
}

// Info print info
func (l *customLogger) Info(ctx context.Context, msg string, data ...interface{}) {
if l.LogLevel >= gormlogger.Info {
l.Printf(l.infoStr+msg, append([]interface{}{fetchCaller()}, data...)...)
}
}

// Warn print warn messages
func (l *customLogger) Warn(ctx context.Context, msg string, data ...interface{}) {
if l.LogLevel >= gormlogger.Warn {
l.Printf(l.warnStr+msg, append([]interface{}{fetchCaller()}, data...)...)
}
}

// Error print error messages
func (l *customLogger) Error(ctx context.Context, msg string, data ...interface{}) {
if l.LogLevel >= gormlogger.Error {
l.Printf(l.errStr+msg, append([]interface{}{fetchCaller()}, data...)...)
}
}

// Trace print sql message
func (l *customLogger) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) {
if l.LogLevel <= gormlogger.Silent {
return
}

elapsed := time.Since(begin)
switch {
case err != nil && l.LogLevel >= gormlogger.Error && (!errors.Is(err, gormlogger.ErrRecordNotFound) || !l.IgnoreRecordNotFoundError):
sql, rows := fc()
if rows == -1 {
l.Printf(l.traceErrStr, fetchCaller(), err, float64(elapsed.Nanoseconds())/1e6, "-", sql)
} else {
l.Printf(l.traceErrStr, fetchCaller(), err, float64(elapsed.Nanoseconds())/1e6, rows, sql)
}
case elapsed > l.SlowThreshold && l.SlowThreshold != 0 && l.LogLevel >= gormlogger.Warn:
sql, rows := fc()
slowLog := fmt.Sprintf("SLOW SQL >= %v", l.SlowThreshold)
if rows == -1 {
l.Printf(l.traceWarnStr, fetchCaller(), slowLog, float64(elapsed.Nanoseconds())/1e6, "-", sql)
} else {
l.Printf(l.traceWarnStr, fetchCaller(), slowLog, float64(elapsed.Nanoseconds())/1e6, rows, sql)
}
case l.LogLevel == gormlogger.Info:
sql, rows := fc()
if rows == -1 {
l.Printf(l.traceStr, fetchCaller(), float64(elapsed.Nanoseconds())/1e6, "-", sql)
} else {
l.Printf(l.traceStr, fetchCaller(), float64(elapsed.Nanoseconds())/1e6, rows, sql)
}
}
}

// ParamsFilter filter params
func (l *customLogger) ParamsFilter(ctx context.Context, sql string, params ...interface{}) (string, []interface{}) {
if l.Config.ParameterizedQueries {
return sql, nil
}
return sql, params
}

func fetchCaller() string {
_, file, line, ok := runtime.Caller(4)
if !ok {
return ""
}
return filepath.Base(file) + ":" + strconv.Itoa(line)
}
1 change: 0 additions & 1 deletion swiftwave_service/db/migrations/20240413191732_init.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ CREATE TABLE "public"."servers" (
"ip" text NULL,
"host_name" text NULL,
"user" text NULL,
"ssh_port" bigint NULL DEFAULT 22,
"schedule_deployments" boolean NULL DEFAULT true,
"docker_unix_socket_path" text NULL,
"swarm_mode" text NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- reverse: modify "servers" table
ALTER TABLE "public"."servers" DROP COLUMN "ssh_port";
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- modify "servers" table
ALTER TABLE "public"."servers" ADD COLUMN "ssh_port" bigint NULL DEFAULT 22;
10 changes: 6 additions & 4 deletions swiftwave_service/db/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
h1:6dZWAUCl1AcaOu5dZJdHaNZ3zngD6MC28BhA19NtHsU=
h1:icdTT4yZ8uFtUOwQFRGT+K1FgTqhqKnIjnOOrTRksdo=
20240413191732_init.down.sql h1:HoitObGwuKF/akF4qg3dol2FfNTLCEuf6wHYDuCez8I=
20240413191732_init.up.sql h1:SSfeL4XTzfkPhdTfo6ElhmZCvlDvESyB4ZLqJbvMkM0=
20240415051843_cifs_config_added.down.sql h1:bpHfK0AkgKiBDjOie+9t/u7uxvjnkSN9skIA85ydtSM=
20240415051843_cifs_config_added.up.sql h1:zgeZR4RgcR+NLCaA1M377m5wMUXNC+gaIsYu9dOLhMM=
20240413191732_init.up.sql h1:USKdQx/yTz1KJ0+mDwYGhKm3WzX7k+I9+6B6SxImwaE=
20240414051823_server_custom_ssh_port_added.down.sql h1:IC1DFQBQceTPTRdZOo5/WqytH+ZbgcKrQuMCkhArF/0=
20240414051823_server_custom_ssh_port_added.up.sql h1:e8WCM91vs1tYOSNujP3H8VL5sdIgL3zJCUO2tJJ4V4A=
20240415051843_cifs_config_added.down.sql h1:mpfh0mD7rG8QzLr+O2lURFSsmVjkZabQZgDqxa4A00M=
20240415051843_cifs_config_added.up.sql h1:O5MSdlTbkma6mtjfsiOKgvuMMnndr2azAq/QnKVbPv8=
19 changes: 15 additions & 4 deletions swiftwave_service/db/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@ func GetClient(config *local_config.Config, maxRetry uint) (*gorm.DB, error) {
if config.IsDevelopmentMode {
logLevel = gormlogger.Info
}
ignoreRecordNotFoundError := true
if config.IsDevelopmentMode {
ignoreRecordNotFoundError = false
}
parameterizedQueries := true
if config.IsDevelopmentMode {
parameterizedQueries = false
}

gormConfig := &gorm.Config{
SkipDefaultTransaction: true,
Logger: gormlogger.New(logger.DatabaseLogger, gormlogger.Config{
SlowThreshold: 500 * time.Millisecond,
Colorful: false,
LogLevel: logLevel,
Logger: newCustomLogger(logger.DatabaseLogger, gormlogger.Config{
SlowThreshold: 500 * time.Millisecond,
Colorful: false,
LogLevel: logLevel,
IgnoreRecordNotFoundError: ignoreRecordNotFoundError,
ParameterizedQueries: parameterizedQueries,
}),
}
currentRetry := uint(0)
Expand Down
22 changes: 15 additions & 7 deletions swiftwave_service/graphql/graphql_object_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,30 @@ func persistentVolumeToGraphqlObject(record *core.PersistentVolume) *model.Persi

// persistentVolumeInputToDatabaseObject : converts PersistentVolumeInput to PersistentVolumeDatabaseObject
func persistentVolumeInputToDatabaseObject(record *model.PersistentVolumeInput) *core.PersistentVolume {
return &core.PersistentVolume{
Name: record.Name,
Type: core.PersistentVolumeType(record.Type),
NFSConfig: core.NFSConfig{
nfsConfig := core.NFSConfig{}
if record.Type == model.PersistentVolumeTypeNfs {
nfsConfig = core.NFSConfig{
Host: record.NfsConfig.Host,
Path: record.NfsConfig.Path,
Version: record.NfsConfig.Version,
},
CIFSConfig: core.CIFSConfig{
}
}
cifsConfig := core.CIFSConfig{}
if record.Type == model.PersistentVolumeTypeCifs {
cifsConfig = core.CIFSConfig{
Share: record.CifsConfig.Share,
Host: record.CifsConfig.Host,
Username: record.CifsConfig.Username,
Password: record.CifsConfig.Password,
FileMode: record.CifsConfig.FileMode,
DirMode: record.CifsConfig.DirMode,
},
}
}
return &core.PersistentVolume{
Name: record.Name,
Type: core.PersistentVolumeType(record.Type),
NFSConfig: nfsConfig,
CIFSConfig: cifsConfig,
}
}

Expand Down
2 changes: 1 addition & 1 deletion swiftwave_service/logger/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strconv"
)

var DatabaseLogger = log.New(os.Stdout, "[DATABASE] ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile)
var DatabaseLogger = log.New(os.Stdout, "[DATABASE] ", log.Ldate|log.Ltime|log.LUTC)
var DatabaseLoggerError = log.New(os.Stdout, "[DATABASE] ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile)
var CronJobLogger = log.New(os.Stdout, "[CRONJOB] ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile)
var CronJobLoggerError = log.New(os.Stdout, "[CRONJOB] ", log.Ldate|log.Ltime|log.LUTC|log.Lshortfile)
Expand Down
Loading