Skip to content

Commit

Permalink
refactor: remove grpc lib to use connect-go (#39)
Browse files Browse the repository at this point in the history
* refactor: remove grpc lib to use connect-go

* chore: cleanup

* feat: fix telemetry and add observability example
  • Loading branch information
Tochemey authored Apr 15, 2023
1 parent 0bae7aa commit e318b7f
Show file tree
Hide file tree
Showing 27 changed files with 658 additions and 1,842 deletions.
6 changes: 5 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ PROJECT tochemey/goakt

FROM tochemey/docker-go:1.20.1-0.7.0

# install the various tools to generate connect-go
RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
RUN go install github.com/bufbuild/connect-go/cmd/protoc-gen-connect-go@latest

# run a PR branch is created
#pr-pipeline:
# PIPELINE
Expand Down Expand Up @@ -72,7 +76,7 @@ internal-pb:
--path protos/internal/goakt

# save artifact to
SAVE ARTIFACT gen/goakt AS LOCAL internal/goaktpb
SAVE ARTIFACT gen/goakt AS LOCAL internal/goakt

protogen:
# copy the proto files to generate
Expand Down
166 changes: 91 additions & 75 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ package actors
import (
"context"
"fmt"
"net/http"
"time"

"github.com/bufbuild/connect-go"
otelconnect "github.com/bufbuild/connect-opentelemetry-go"
cmp "github.com/orcaman/concurrent-map/v2"
"github.com/pkg/errors"
goaktpb "github.com/tochemey/goakt/internal/goaktpb/v1"
"github.com/tochemey/goakt/internal/grpc"
goaktpb "github.com/tochemey/goakt/internal/goakt/v1"
"github.com/tochemey/goakt/internal/goakt/v1/goaktv1connect"
"github.com/tochemey/goakt/internal/resync"
"github.com/tochemey/goakt/internal/telemetry"
"github.com/tochemey/goakt/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
ggrpc "google.golang.org/grpc"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
Expand Down Expand Up @@ -59,6 +66,8 @@ type ActorSystem interface {
// ActorSystem represent a collection of actors on a given node
// Only a single instance of the ActorSystem can be created on a given node
type actorSystem struct {
goaktv1connect.UnimplementedRemoteMessagingServiceHandler

// Specifies the actor system name
name string
// Specifies the node where the actor system is located
Expand All @@ -78,8 +87,6 @@ type actorSystem struct {

// observability settings
telemetry *telemetry.Telemetry
// specifies the remoting service
remotingService grpc.Server

typesLoader TypesLoader
reflection Reflection
Expand All @@ -99,17 +106,16 @@ func NewActorSystem(config *Config) (ActorSystem, error) {
// the function only gets called one
once.Do(func() {
system = &actorSystem{
name: config.Name(),
nodeAddr: config.NodeHostAndPort(),
actors: cmp.New[PID](),
logger: config.Logger(),
host: "",
port: 0,
config: config,
hasStarted: atomic.NewBool(false),
telemetry: config.telemetry,
remotingService: nil,
typesLoader: NewTypesLoader(nil),
name: config.Name(),
nodeAddr: config.NodeHostAndPort(),
actors: cmp.New[PID](),
logger: config.Logger(),
host: "",
port: 0,
config: config,
hasStarted: atomic.NewBool(false),
telemetry: config.telemetry,
typesLoader: NewTypesLoader(nil),
}
// set host and port
host, port := config.HostAndPort()
Expand Down Expand Up @@ -275,8 +281,8 @@ func (a *actorSystem) Start(ctx context.Context) error {
a.hasStarted.Store(true)

// start remoting when remoting is enabled
if err := a.enableRemoting(ctx); err != nil {
return err
if a.config.remotingEnabled {
go a.enableRemoting(ctx)
}

// start the metrics service
Expand All @@ -297,11 +303,6 @@ func (a *actorSystem) Stop(ctx context.Context) error {
defer span.End()
a.logger.Infof("%s ActorSystem is shutting down on Node=%s...", a.name, a.nodeAddr)

// stop remoting service when set
if a.remotingService != nil {
a.remotingService.Stop(ctx)
}

// short-circuit the shutdown process when there are no online actors
if len(a.Actors()) == 0 {
a.logger.Info("No online actors to shutdown. Shutting down successfully done")
Expand All @@ -326,13 +327,8 @@ func (a *actorSystem) Stop(ctx context.Context) error {
return nil
}

// RegisterService register the remoting service
func (a *actorSystem) RegisterService(srv *ggrpc.Server) {
goaktpb.RegisterRemoteMessagingServiceServer(srv, a)
}

// RemoteLookup for an actor on a remote host.
func (a *actorSystem) RemoteLookup(ctx context.Context, request *goaktpb.RemoteLookupRequest) (*goaktpb.RemoteLookupResponse, error) {
func (a *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[goaktpb.RemoteLookupRequest]) (*connect.Response[goaktpb.RemoteLookupResponse], error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteLookup")
defer span.End()
Expand All @@ -341,13 +337,13 @@ func (a *actorSystem) RemoteLookup(ctx context.Context, request *goaktpb.RemoteL
logger := a.logger

// first let us make a copy of the incoming request
reqCopy := proto.Clone(request).(*goaktpb.RemoteLookupRequest)
reqCopy := request.Msg

// let us validate the host and port
hostAndPort := fmt.Sprintf("%s:%d", reqCopy.GetHost(), reqCopy.GetPort())
if hostAndPort != a.nodeAddr {
// log the error
logger.Error(ErrRemoteSendInvalidNode)
logger.Error(ErrRemoteSendInvalidNode.Message())
// here message is sent to the wrong actor system node
return nil, ErrRemoteSendInvalidNode
}
Expand All @@ -361,34 +357,34 @@ func (a *actorSystem) RemoteLookup(ctx context.Context, request *goaktpb.RemoteL
// return an error when the remote address is not found
if !exist {
// log the error
logger.Error(ErrRemoteActorNotFound(actorPath.String()))
logger.Error(ErrRemoteActorNotFound(actorPath.String()).Error())
return nil, ErrRemoteActorNotFound(actorPath.String())
}

// let us construct the address
addr := pid.ActorPath().RemoteAddress()

return &goaktpb.RemoteLookupResponse{Address: addr}, nil
return connect.NewResponse(&goaktpb.RemoteLookupResponse{Address: addr}), nil
}

// RemoteAsk is used to send a message to an actor remotely and expect a response
// immediately. With this type of message the receiver cannot communicate back to Sender
// except reply the message with a response. This one-way communication
func (a *actorSystem) RemoteAsk(ctx context.Context, request *goaktpb.RemoteAskRequest) (*goaktpb.RemoteAskResponse, error) {
func (a *actorSystem) RemoteAsk(ctx context.Context, request *connect.Request[goaktpb.RemoteAskRequest]) (*connect.Response[goaktpb.RemoteAskResponse], error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteAsk")
defer span.End()

// get a context logger
logger := a.logger
// first let us make a copy of the incoming request
reqCopy := proto.Clone(request).(*goaktpb.RemoteAskRequest)
reqCopy := request.Msg

// let us validate the host and port
hostAndPort := fmt.Sprintf("%s:%d", reqCopy.GetReceiver().GetHost(), reqCopy.GetReceiver().GetPort())
if hostAndPort != a.nodeAddr {
// log the error
logger.Error(ErrRemoteSendInvalidNode)
logger.Error(ErrRemoteSendInvalidNode.Message())
// here message is sent to the wrong actor system node
return nil, ErrRemoteSendInvalidNode
}
Expand All @@ -402,7 +398,7 @@ func (a *actorSystem) RemoteAsk(ctx context.Context, request *goaktpb.RemoteAskR
// return an error when the remote address is not found
if !exist {
// log the error
logger.Error(ErrRemoteActorNotFound(actorPath.String()))
logger.Error(ErrRemoteActorNotFound(actorPath.String()).Error())
return nil, ErrRemoteActorNotFound(actorPath.String())
}
// restart the actor when it is not live
Expand All @@ -416,33 +412,33 @@ func (a *actorSystem) RemoteAsk(ctx context.Context, request *goaktpb.RemoteAskR
reply, err := a.handleRemoteAsk(ctx, pid, reqCopy.GetMessage())
// handle the error
if err != nil {
logger.Error(ErrRemoteSendFailure(err))
logger.Error(ErrRemoteSendFailure(err).Error())
return nil, ErrRemoteSendFailure(err)
}
// let us marshal the reply
marshaled, _ := anypb.New(reply)
return &goaktpb.RemoteAskResponse{Message: marshaled}, nil
return connect.NewResponse(&goaktpb.RemoteAskResponse{Message: marshaled}), nil
}

// RemoteTell is used to send a message to an actor remotely by another actor
// This is the only way remote actors can interact with each other. The actor on the
// other line can reply to the sender by using the Sender in the message
func (a *actorSystem) RemoteTell(ctx context.Context, request *goaktpb.RemoteTellRequest) (*goaktpb.RemoteTellResponse, error) {
func (a *actorSystem) RemoteTell(ctx context.Context, request *connect.Request[goaktpb.RemoteTellRequest]) (*connect.Response[goaktpb.RemoteTellResponse], error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteTell")
defer span.End()

// get a context logger
logger := a.logger
// first let us make a copy of the incoming request
reqCopy := proto.Clone(request).(*goaktpb.RemoteTellRequest)
reqCopy := request.Msg

receiver := reqCopy.GetRemoteMessage().GetReceiver()
// let us validate the host and port
hostAndPort := fmt.Sprintf("%s:%d", receiver.GetHost(), receiver.GetPort())
if hostAndPort != a.nodeAddr {
// log the error
logger.Error(ErrRemoteSendInvalidNode)
logger.Error(ErrRemoteSendInvalidNode.Message())
// here message is sent to the wrong actor system node
return nil, ErrRemoteSendInvalidNode
}
Expand All @@ -461,7 +457,7 @@ func (a *actorSystem) RemoteTell(ctx context.Context, request *goaktpb.RemoteTel
// return an error when the remote address is not found
if !exist {
// log the error
logger.Error(ErrRemoteActorNotFound(actorPath.String()))
logger.Error(ErrRemoteActorNotFound(actorPath.String()).Error())
return nil, ErrRemoteActorNotFound(actorPath.String())
}
// restart the actor when it is not live
Expand All @@ -476,7 +472,7 @@ func (a *actorSystem) RemoteTell(ctx context.Context, request *goaktpb.RemoteTel
logger.Error(ErrRemoteSendFailure(err))
return nil, ErrRemoteSendFailure(err)
}
return &goaktpb.RemoteTellResponse{}, nil
return connect.NewResponse(new(goaktpb.RemoteTellResponse)), nil
}

// registerMetrics register the PID metrics with OTel instrumentation.
Expand All @@ -490,9 +486,14 @@ func (a *actorSystem) registerMetrics() error {
return err
}

// define the common labels
labels := []attribute.KeyValue{
attribute.String("actor.system", a.Name()),
}

// register the metrics
_, err = meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error {
observer.ObserveInt64(metrics.ActorSystemActorsCount, int64(a.NumActors()))
observer.ObserveInt64(metrics.ActorSystemActorsCount, int64(a.NumActors()), labels...)
return nil
}, metrics.ActorSystemActorsCount)

Expand All @@ -517,46 +518,61 @@ func (a *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot
}

// enableRemoting enables the remoting service to handle remote messaging
func (a *actorSystem) enableRemoting(ctx context.Context) error {
// start remoting when remoting is enabled
if a.config.remotingEnabled {
// build the grpc server
config := &grpc.Config{
ServiceName: a.Name(),
GrpcPort: int32(a.Port()),
GrpcHost: a.Host(),
TraceEnabled: false, // TODO
TraceURL: "", // TODO
EnableReflection: false,
Logger: a.logger,
}

// build the grpc service
remotingService, err := grpc.
GetServerBuilder(config).
WithService(a).
Build()
func (a *actorSystem) enableRemoting(ctx context.Context) {
// create a function to handle the observability
interceptor := func(tp trace.TracerProvider, mp metric.MeterProvider) connect.Interceptor {
return otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(tp),
otelconnect.WithMeterProvider(mp),
)
}

// handle the error
if err != nil {
a.logger.Error(errors.Wrap(err, "failed to start remoting service"))
return err
}
// create a http server mux
mux := http.NewServeMux()
// create the resource and handler
path, handler := goaktv1connect.NewRemoteMessagingServiceHandler(
a,
connect.WithInterceptors(interceptor(a.telemetry.TracerProvider, a.telemetry.MeterProvider)),
)
mux.Handle(path, handler)
// create the address
serverAddr := fmt.Sprintf(":%d", a.Port())
// create a http server instance
// TODO revisit the timeouts
// reference: https://adam-p.ca/blog/2022/01/golang-http-server-timeouts/
server := &http.Server{
Addr: serverAddr,
// The maximum duration for reading the entire request, including the body.
// It’s implemented in net/http by calling SetReadDeadline immediately after Accept
// ReadTimeout := handler_timeout + ReadHeaderTimeout + wiggle_room
ReadTimeout: 3 * time.Second,
// ReadHeaderTimeout is the amount of time allowed to read request headers
ReadHeaderTimeout: time.Second,
// WriteTimeout is the maximum duration before timing out writes of the response.
// It is reset whenever a new request’s header is read.
// This effectively covers the lifetime of the ServeHTTP handler stack
WriteTimeout: time.Second,
// IdleTimeout is the maximum amount of time to wait for the next request when keep-alive are enabled.
// If IdleTimeout is zero, the value of ReadTimeout is used. Not relevant to request timeouts
IdleTimeout: 1200 * time.Second,
// For gRPC clients, it's convenient to support HTTP/2 without TLS. You can
// avoid x/net/http2 by using http.ListenAndServeTLS.
Handler: h2c.NewHandler(mux, &http2.Server{
IdleTimeout: 1200 * time.Second,
}),
}

// set the remoting service
a.remotingService = remotingService
// start the remoting service
a.remotingService.Start(ctx)
// listen and serv er requests
if err := server.ListenAndServe(); err != nil {
a.logger.Panic(errors.Wrap(err, "failed to start remoting service"))
}
return nil
}

// reset the actor system
func (a *actorSystem) reset() {
// void the settings
a.config = nil
a.hasStarted = atomic.NewBool(false)
a.remotingService = nil
a.telemetry = nil
a.actors = cmp.New[PID]()
a.nodeAddr = ""
Expand Down
Loading

0 comments on commit e318b7f

Please sign in to comment.