Skip to content

Commit

Permalink
Expose gameserver metrics via HTTP
Browse files Browse the repository at this point in the history
This commit adds functionality to expose gameserver metrics via HTTP.
These metrics are consumed by the server-registry and are displayed in the `.tc9 ws ls` command.
This enhancement allows for better monitoring and management of game-server instances and enables possibility to implement k8s operator for auto scaling.
  • Loading branch information
walkline committed Mar 1, 2024
1 parent 15cd821 commit 709f411
Show file tree
Hide file tree
Showing 24 changed files with 697 additions and 332 deletions.
14 changes: 12 additions & 2 deletions api/proto/v1/servers-registry/registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,22 @@ message ListGameServersForRealmRequest {
}

message GameServerDetailed {
message Diff {
uint32 mean = 1;
uint32 median = 2;
uint32 percentile95 = 3;
uint32 percentile99 = 4;
uint32 max = 5;
};

string address = 1;
string healthAddress = 2;
string grpcAddress = 3;
uint32 realmID = 4;
repeated uint32 availableMaps = 5;
repeated uint32 assignedMaps = 6;
uint32 activeConnections = 5;
Diff diff = 6;
repeated uint32 availableMaps = 7;
repeated uint32 assignedMaps = 8;
}

message ListGameServersForRealmResponse {
Expand Down
3 changes: 2 additions & 1 deletion apps/game-load-balancer/cmd/game-load-balancer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"

Expand Down Expand Up @@ -70,7 +71,7 @@ func main() {
groupClient := groupService(conf)

healthandmetrics.EnableActiveConnectionsMetrics()
healthCheckServer := healthandmetrics.NewServer(conf.HealthCheckPort, true)
healthCheckServer := healthandmetrics.NewServer(conf.HealthCheckPort, promhttp.Handler())

go func() {
err = healthCheckServer.ListenAndServe()
Expand Down
2 changes: 1 addition & 1 deletion apps/game-load-balancer/session/character.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *GameSession) CreateCharacter(ctx context.Context, p *packet.Packet) err
}

go socket.ListenAndProcess(s.ctx)
newCtx, cancel := context.WithTimeout(s.ctx, time.Second*5)
newCtx, cancel := context.WithTimeout(s.ctx, time.Second*20)
defer cancel()

waitDone := make(chan struct{})
Expand Down
10 changes: 9 additions & 1 deletion apps/game-load-balancer/session/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package session
import (
"context"
"fmt"
pbGroup "github.com/walkline/ToCloud9/gen/group/pb"
"strings"

root "github.com/walkline/ToCloud9/apps/game-load-balancer"
eBroadcaster "github.com/walkline/ToCloud9/apps/game-load-balancer/events-broadcaster"
"github.com/walkline/ToCloud9/apps/game-load-balancer/packet"
pbChat "github.com/walkline/ToCloud9/gen/chat/pb"
pbGroup "github.com/walkline/ToCloud9/gen/group/pb"
pbGuild "github.com/walkline/ToCloud9/gen/guilds/pb"
pbServ "github.com/walkline/ToCloud9/gen/servers-registry/pb"
)
Expand Down Expand Up @@ -277,6 +277,14 @@ func (s *GameSession) handleCommandMsgListGameServers(ctx context.Context) error
s.SendSysMessage(fmt.Sprintf("> Node address: %s.", server.Address))
s.SendSysMessage(fmt.Sprintf(" Available maps: %s.", mapsAvailable))
s.SendSysMessage(fmt.Sprintf(" Assigned maps: %s.", assignedMaps))
s.SendSysMessage(fmt.Sprintf(" Active connections: %d.", server.ActiveConnections))
s.SendSysMessage(
fmt.Sprintf(
" Diff (mean, median, 95, 99, max): %dms, %dms, %dms, %dms, %dms.",
server.Diff.Mean, server.Diff.Median, server.Diff.Percentile95,
server.Diff.Percentile99, server.Diff.Max,
),
)

if isCurrentlyUsing {
s.SendSysMessage(" You are |cff4CFF00connected |rto this one.")
Expand Down
1 change: 1 addition & 0 deletions apps/servers-registry/cmd/servers-registry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func main() {
mainContext,
repo.NewGameServerRedisRepo(rdb),
healthChecker,
metricsConsumer,
binpack.NewBinPackBalancer(binpack.DefaultMapsWeight), // TODO: implement providing custom maps weight list.
events.NewServerRegistryProducerNatsJSON(nc, "0.0.1"),
supportedRealms,
Expand Down
16 changes: 16 additions & 0 deletions apps/servers-registry/repo/game-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import (
"sort"
)

type DiffData struct {
Mean uint32
Median uint32
Percentile95 uint32
Percentile99 uint32
Max uint32
}

type GameServer struct {
ID string
Address string
Expand All @@ -13,6 +21,9 @@ type GameServer struct {
RealmID uint32
AvailableMaps []uint32

ActiveConnections uint32
Diff DiffData

// AssignedMapsToHandle list of maps that loadbalancer algorithm assigned for this server.
AssignedMapsToHandle []uint32

Expand All @@ -26,6 +37,10 @@ func (g *GameServer) HealthCheckAddress() string {
return g.HealthCheckAddr
}

func (g *GameServer) MetricsAddress() string {
return g.HealthCheckAddr
}

func (g *GameServer) CanHandleMap(id uint32) bool {
i := sort.Search(len(g.AssignedMapsToHandle), func(i int) bool { return g.AssignedMapsToHandle[i] >= id })
// item exists
Expand Down Expand Up @@ -55,6 +70,7 @@ func (g *GameServer) Copy() GameServer {

type GameServerRepo interface {
Upsert(context.Context, *GameServer) error
Update(ctx context.Context, id string, f func(*GameServer) *GameServer) error
Remove(ctx context.Context, id string) error
ListByRealm(ctx context.Context, realmID uint32) ([]GameServer, error)
One(ctx context.Context, id string) (*GameServer, error)
Expand Down
14 changes: 14 additions & 0 deletions apps/servers-registry/repo/game-server_inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ func (g *gameServerInMemRepo) Upsert(ctx context.Context, server *GameServer) er
return nil
}

func (g *gameServerInMemRepo) Update(ctx context.Context, id string, f func(*GameServer) *GameServer) error {
g.mutex.Lock()
defer g.mutex.Unlock()

for i := range g.storage {
if g.storage[i].ID == id {
g.storage[i] = *f(&g.storage[i])
return nil
}
}

return nil
}

func (g *gameServerInMemRepo) Remove(ctx context.Context, id string) error {
g.mutex.Lock()
defer g.mutex.Unlock()
Expand Down
23 changes: 23 additions & 0 deletions apps/servers-registry/repo/game-server_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ func (g *gameServerRedisRepo) Upsert(ctx context.Context, server *GameServer) er
return nil
}

func (g *gameServerRedisRepo) Update(ctx context.Context, id string, f func(*GameServer) *GameServer) error {
res := g.rdb.Get(ctx, g.key(id))
if res.Err() != nil {
return res.Err()
}

v := &GameServer{}
err := json.Unmarshal([]byte(res.Val()), v)
if err != nil {
return err
}

newV := f(v)
d, err := json.Marshal(newV)
if err != nil {
return err
}

key := g.key(newV.ID)
status := g.rdb.Set(ctx, key, d, 0)
return status.Err()
}

func (g *gameServerRedisRepo) Remove(ctx context.Context, id string) error {
key := g.key(id)
res := g.rdb.Get(ctx, key)
Expand Down
20 changes: 14 additions & 6 deletions apps/servers-registry/server/registry-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,20 @@ func (s *serversRegistry) ListGameServersForRealm(ctx context.Context, request *
respServers := make([]*pb.GameServerDetailed, len(servers))
for i := range servers {
respServers[i] = &pb.GameServerDetailed{
Address: servers[i].Address,
HealthAddress: servers[i].HealthCheckAddr,
GrpcAddress: servers[i].GRPCAddress,
RealmID: servers[i].RealmID,
AvailableMaps: servers[i].AvailableMaps,
AssignedMaps: servers[i].AssignedMapsToHandle,
Address: servers[i].Address,
HealthAddress: servers[i].HealthCheckAddr,
GrpcAddress: servers[i].GRPCAddress,
RealmID: servers[i].RealmID,
ActiveConnections: servers[i].ActiveConnections,
AvailableMaps: servers[i].AvailableMaps,
AssignedMaps: servers[i].AssignedMapsToHandle,
Diff: &pb.GameServerDetailed_Diff{
Mean: servers[i].Diff.Mean,
Median: servers[i].Diff.Median,
Percentile95: servers[i].Diff.Percentile95,
Percentile99: servers[i].Diff.Percentile99,
Max: servers[i].Diff.Max,
},
}
}

Expand Down
33 changes: 33 additions & 0 deletions apps/servers-registry/service/game-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type GameServer interface {
type gameServerImpl struct {
r repo.GameServerRepo
checker healthandmetrics.HealthChecker
metrics healthandmetrics.MetricsConsumer
mapBalancer mapbalancing.MapDistributor
eProducer events.ServerRegistryProducer
}
Expand All @@ -33,13 +34,15 @@ func NewGameServer(
ctx context.Context,
r repo.GameServerRepo,
checker healthandmetrics.HealthChecker,
metrics healthandmetrics.MetricsConsumer,
mapBalancer mapbalancing.MapDistributor,
eProducer events.ServerRegistryProducer,
supportedRealmIDs []uint32,
) (GameServer, error) {
service := &gameServerImpl{
r: r,
checker: checker,
metrics: metrics,
mapBalancer: mapBalancer,
eProducer: eProducer,
}
Expand All @@ -50,6 +53,12 @@ func NewGameServer(
}
})

metrics.AddObserver(func(observable healthandmetrics.MetricsObservable, read *healthandmetrics.MetricsRead) {
if gs, ok := observable.(*repo.GameServer); ok {
service.onMetricsUpdate(gs, read)
}
})

for _, id := range supportedRealmIDs {
servers, err := r.ListByRealm(ctx, id)
if err != nil {
Expand All @@ -60,6 +69,11 @@ func NewGameServer(
if err = checker.AddHealthCheckObject(&servers[i]); err != nil {
return nil, err
}

err = metrics.AddMetricsObservable(&servers[i])
if err != nil {
return nil, err
}
}
}

Expand All @@ -75,6 +89,10 @@ func (g *gameServerImpl) Register(ctx context.Context, server *repo.GameServer)
return err
}

if err := g.metrics.AddMetricsObservable(server); err != nil {
return err
}

if err := g.r.Upsert(ctx, server); err != nil {
return err
}
Expand Down Expand Up @@ -243,3 +261,18 @@ func (g *gameServerImpl) distributeMapsToServers(ctx context.Context, servers []

return distributed, nil
}

func (g *gameServerImpl) onMetricsUpdate(server *repo.GameServer, m *healthandmetrics.MetricsRead) {
err := g.r.Update(context.Background(), server.ID, func(s *repo.GameServer) *repo.GameServer {
s.ActiveConnections = uint32(m.ActiveConnections)
s.Diff.Mean = uint32(m.DelayMean)
s.Diff.Median = uint32(m.DelayMedian)
s.Diff.Percentile99 = uint32(m.Delay99Percentile)
s.Diff.Percentile95 = uint32(m.Delay95Percentile)
s.Diff.Max = uint32(m.DelayMax)
return s
})
if err != nil {
log.Error().Err(err).Msg("can't update metrics for game server")
}
}
2 changes: 1 addition & 1 deletion game-server/azerothcore/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN update-alternatives --install /usr/bin/c++ c++ /usr/bin/clang 100
RUN mkdir repo
RUN cd repo && git init && \
git remote add origin https://github.com/walkline/azerothcore-wotlk.git && \
git fetch --depth 1 origin 6db9abf503ba09a665cc3b65e2d4db1da013da4c && \
git fetch --depth 1 origin f41e5d6c8d4026f6326fbeaa30f867a9c767d246 && \
git checkout FETCH_HEAD

COPY --from=build-sidecar ./go/src/github.com/walkline/ToCloud9/libsidecar.so ./repo/deps/libsidecar/libsidecar.so
Expand Down
57 changes: 4 additions & 53 deletions game-server/libsidecar/grpc-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/walkline/ToCloud9/gen/worldserver/pb"
)

var grpcReadRequestsQueue = queue.NewHandlersFIFOQueue()
var grpcWriteRequestsQueue = queue.NewHandlersFIFOQueue()
var readRequestsQueue = queue.NewHandlersFIFOQueue()
var writeRequestsQueue = queue.NewHandlersFIFOQueue()

func SetupGRPCService(conf *config.Config) (net.Listener, *grpc.Server) {
grpcapi.LibVer = libVer
Expand All @@ -39,59 +39,10 @@ func SetupGRPCService(conf *config.Config) (net.Listener, *grpc.Server) {
CanPlayerInteractWithGO: CanPlayerInteractWithGOAndTypeHandler,
},
time.Second*5,
grpcReadRequestsQueue,
grpcWriteRequestsQueue,
readRequestsQueue,
writeRequestsQueue,
),
)

return lis, grpcServer
}

// TC9ProcessGRPCRequests calls all grpc handlers in queue.
//
//export TC9ProcessGRPCRequests
func TC9ProcessGRPCRequests() {
// Parallel read processing disabled, since goroutines setup time is bigger than benefits for the low amount of requests.
// Can be enabled if read requests increase.

//// TODO: make this configurable.
//const readGoroutineCount = 4
//
//// Handle read operations.
//// Read operation is safe to process in parallel.
//wg := sync.WaitGroup{}
//wg.Add(readGoroutineCount)
//for i := 0; i < readGoroutineCount; i++ {
// go func() {
// defer wg.Done()
//
// handler := grpcReadRequestsQueue.Pop()
// for handler != nil {
// handler.Handle()
// handler = grpcReadRequestsQueue.Pop()
// }
// }()
//}
//
//wg.Wait()
//
//// Handle write operations.
//// Since TC is not tread-safe for write operations, we can have only 1 goroutine to process.
//handler := grpcWriteRequestsQueue.Pop()
//for handler != nil {
// handler.Handle()
// handler = grpcWriteRequestsQueue.Pop()
//}

handler := grpcReadRequestsQueue.Pop()
for handler != nil {
handler.Handle()
handler = grpcReadRequestsQueue.Pop()
}

handler = grpcWriteRequestsQueue.Pop()
for handler != nil {
handler.Handle()
handler = grpcWriteRequestsQueue.Pop()
}
}
Loading

0 comments on commit 709f411

Please sign in to comment.