Skip to content

Commit

Permalink
feat: add TLS support (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Jan 16, 2025
1 parent b7db344 commit 03bc08b
Show file tree
Hide file tree
Showing 32 changed files with 2,383 additions and 1,128 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
fetch-depth: 0
- uses: actions/setup-go@v5
with:
go-version: '>=1.22.0'
go-version: '1.22.0'
check-latest: true
cache-dependency-path: "**/*.sum"
- run: go version
Expand All @@ -47,7 +47,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '>=1.22.0'
go-version: '1.22.0'
check-latest: true
cache-dependency-path: "**/*.sum"
- name: golangci-lint
Expand Down
70 changes: 70 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"

on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '28 20 * * 0'

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write

strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support

steps:
- name: Checkout repository
uses: actions/checkout@v4

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl

# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language

#- run: |
# make bootstrap
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
4 changes: 2 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
fetch-depth: 0
- uses: actions/setup-go@v5
with:
go-version: '>=1.22.0'
go-version: '1.22.0'
check-latest: true
cache-dependency-path: "**/*.sum"
- run: go version
Expand All @@ -51,7 +51,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '>=1.22.0'
go-version: '1.22.0'
check-latest: true
cache-dependency-path: "**/*.sum"
- name: golangci-lint
Expand Down
129 changes: 94 additions & 35 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package actors

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -292,6 +293,9 @@ type actorSystem struct {

actorsCounter *atomic.Uint64
deadlettersCounter *atomic.Uint64

tlsClientConfig *tls.Config
tlsServerConfig *tls.Config
}

var (
Expand Down Expand Up @@ -365,10 +369,13 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}
}

system.scheduler = newScheduler(system.logger,
system.shutdownTimeout,
withSchedulerCluster(system.cluster),
withSchedulerRemoting(NewRemoting()))
// perform some quick validations on the TLS configurations
if (system.tlsServerConfig == nil) != (system.tlsClientConfig == nil) {
return nil, ErrInvalidTLSConfiguration
}

// append the right protocols to the TLS settings
system.ensureTLSProtos()

return system, nil
}
Expand Down Expand Up @@ -452,7 +459,7 @@ func (x *actorSystem) Start(ctx context.Context) error {
return err
}

x.scheduler.Start(ctx)
x.startMessagesScheduler(ctx)
x.startedAt.Store(time.Now().Unix())
x.logger.Infof("%s actor system successfully started..:)", x.name)
return nil
Expand Down Expand Up @@ -1356,6 +1363,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
cluster.WithWriteQuorum(x.clusterConfig.WriteQuorum()),
cluster.WithReadQuorum(x.clusterConfig.ReadQuorum()),
cluster.WithReplicaCount(x.clusterConfig.ReplicaCount()),
cluster.WithTLS(x.tlsServerConfig, x.tlsClientConfig),
)
if err != nil {
x.logger.Errorf("failed to initialize cluster engine: %v", err)
Expand Down Expand Up @@ -1403,6 +1411,7 @@ func (x *actorSystem) enableRemoting(ctx context.Context) error {
if !x.remotingEnabled.Load() {
return nil
}

x.logger.Info("enabling remoting...")
remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x)
Expand All @@ -1429,11 +1438,49 @@ func (x *actorSystem) enableRemoting(ctx context.Context) error {
}
}()

x.remoting = NewRemoting()
// configure remoting
x.setRemoting()
x.logger.Info("remoting enabled...:)")
return nil
}

// setRemoting sets the remoting service
func (x *actorSystem) setRemoting() {
if x.tlsClientConfig != nil {
x.remoting = NewRemoting(WithRemotingTLS(x.tlsClientConfig))
return
}
x.remoting = NewRemoting()
}

// startMessagesScheduler starts the messages scheduler
func (x *actorSystem) startMessagesScheduler(ctx context.Context) {
// set the scheduler
x.scheduler = newScheduler(x.logger,
x.shutdownTimeout,
withSchedulerCluster(x.cluster),
withSchedulerRemoting(x.remoting))
// start the scheduler
x.scheduler.Start(ctx)
}

func (x *actorSystem) ensureTLSProtos() {
if x.tlsServerConfig != nil && x.tlsClientConfig != nil {
// ensure that the required protocols are set for the TLS
toAdd := []string{"h2", "http/1.1"}

// server application protocols setting
protos := goset.NewSet[string](x.tlsServerConfig.NextProtos...)
protos.Append(toAdd...)
x.tlsServerConfig.NextProtos = protos.ToSlice()

// client application protocols setting
protos = goset.NewSet[string](x.tlsClientConfig.NextProtos...)
protos.Append(toAdd...)
x.tlsClientConfig.NextProtos = protos.ToSlice()
}
}

// reset the actor system
func (x *actorSystem) reset() {
x.actors.Reset()
Expand Down Expand Up @@ -1700,6 +1747,7 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
withActorSystem(x),
withEventsStream(x.eventsStream),
withInitTimeout(x.actorInitTimeout),
withRemoting(x.remoting),
}

spawnConfig := newSpawnConfig(opts...)
Expand Down Expand Up @@ -1772,22 +1820,31 @@ func (x *actorSystem) shutdownHTTPServer(ctx context.Context) error {
func (x *actorSystem) configureServer(ctx context.Context, mux *nethttp.ServeMux) error {
hostPort := net.JoinHostPort(x.host, strconv.Itoa(int(x.port)))
httpServer := getServer(ctx, hostPort)
// create a tcp listener
lnr, err := net.Listen("tcp", hostPort)
listener, err := tcp.NewKeepAliveListener(httpServer.Addr)
if err != nil {
return err
}

// set the http server
// Configure HTTP/2 with performance tuning
http2Server := &http2.Server{
MaxConcurrentStreams: 1000, // Allow up to 1000 concurrent streams
MaxReadFrameSize: 10 << 20, // 10 MB max frame size
IdleTimeout: 1200 * time.Second, // Timeout for idle connections
}

// set the http TLS server
if x.tlsServerConfig != nil {
x.server = httpServer
x.server.TLSConfig = x.tlsServerConfig
x.server.Handler = mux
x.listener = tls.NewListener(listener, x.tlsServerConfig)
return http2.ConfigureServer(x.server, http2Server)
}

// http/2 server with h2c (HTTP/2 Cleartext).
x.server = httpServer
// For gRPC clients, it's convenient to support HTTP/2 without TLS.
x.server.Handler = h2c.NewHandler(
mux, &http2.Server{
IdleTimeout: 1200 * time.Second,
},
)
// set the non-secure http server
x.listener = lnr
x.server.Handler = h2c.NewHandler(mux, http2Server)
x.listener = listener
return nil
}

Expand Down Expand Up @@ -1879,25 +1936,27 @@ func (x *actorSystem) spawnJanitor(ctx context.Context) error {

// spawnRebalancer creates the cluster rebalancer
func (x *actorSystem) spawnRebalancer(ctx context.Context) error {
var err error
actorName := x.reservedName(rebalancerType)
x.rebalancer, err = x.configPID(ctx,
actorName,
newRebalancer(x.reflection),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewRestartDirective()),
NewSupervisorStrategy(&runtime.PanicNilError{}, NewRestartDirective()),
NewSupervisorStrategy(rebalancingError{}, NewRestartDirective()),
NewSupervisorStrategy(InternalError{}, NewResumeDirective()),
NewSupervisorStrategy(SpawnError{}, NewResumeDirective()),
),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start cluster rebalancer: %w", actorName, err)
}
if x.clusterEnabled.Load() {
var err error
actorName := x.reservedName(rebalancerType)
x.rebalancer, err = x.configPID(ctx,
actorName,
newRebalancer(x.reflection, x.remoting),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewRestartDirective()),
NewSupervisorStrategy(&runtime.PanicNilError{}, NewRestartDirective()),
NewSupervisorStrategy(rebalancingError{}, NewRestartDirective()),
NewSupervisorStrategy(InternalError{}, NewResumeDirective()),
NewSupervisorStrategy(SpawnError{}, NewResumeDirective()),
),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start cluster rebalancer: %w", actorName, err)
}

// the rebalancer is a child actor of the system guardian
_ = x.actors.AddNode(x.systemGuardian, x.rebalancer)
// the rebalancer is a child actor of the system guardian
_ = x.actors.AddNode(x.systemGuardian, x.rebalancer)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ package actors

import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/travisjeffery/go-dynaport"
Expand Down
2 changes: 2 additions & 0 deletions actors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ var (
ErrPriorityMessageRequired = errors.New("priority message type is required")
// ErrActorAlreadyExists is returned when trying to create the same actor more than once
ErrActorAlreadyExists = func(actorName string) error { return fmt.Errorf("actor=(%s) already exists", actorName) }
// ErrInvalidTLSConfiguration is returned whent the TLS configuration is not properly set
ErrInvalidTLSConfiguration = errors.New("TLS configuration is invalid")
)

// eof returns true if the given error is an EOF error
Expand Down
Loading

0 comments on commit 03bc08b

Please sign in to comment.