Skip to content

Commit

Permalink
Support client heartbeat (#296)
Browse files Browse the repository at this point in the history
Implement spec change open-telemetry/opamp-spec#190
  • Loading branch information
haoqixu authored Aug 30, 2024
1 parent 536037b commit 7cdd395
Show file tree
Hide file tree
Showing 12 changed files with 618 additions and 388 deletions.
12 changes: 12 additions & 0 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) {
h.receiveProcessor.ProcessReceivedMessage(ctx, &response)
}

func (h *HTTPSender) SetHeartbeatInterval(duration time.Duration) error {
if duration <= 0 {
return errors.New("heartbeat interval for httpclient must be greater than zero")
}

if duration != 0 {
h.SetPollingInterval(duration)
}

return nil
}

// SetPollingInterval sets the interval between polling. Has effect starting from the
// next polling cycle.
func (h *HTTPSender) SetPollingInterval(duration time.Duration) {
Expand Down
20 changes: 20 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
srv.Close()
}

func TestHTTPSenderSetHeartbeatInterval(t *testing.T) {
sender := NewHTTPSender(&sharedinternal.NopLogger{})

// Default interval should be 30s as per OpAMP Specification
assert.Equal(t, (30 * time.Second).Milliseconds(), sender.pollingIntervalMs)

// zero is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(0))
assert.Equal(t, (30 * time.Second).Milliseconds(), sender.pollingIntervalMs)

// negative interval is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(-1))
assert.Equal(t, (30 * time.Second).Milliseconds(), sender.pollingIntervalMs)

// zero should be valid for http sender
expected := 10 * time.Second
assert.NoError(t, sender.SetHeartbeatInterval(expected))
assert.Equal(t, expected.Milliseconds(), sender.pollingIntervalMs)
}

func TestAddTLSConfig(t *testing.T) {
sender := NewHTTPSender(&sharedinternal.NopLogger{})

Expand Down
8 changes: 8 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -216,6 +217,13 @@ func (r *receivedProcessor) rcvOpampConnectionSettings(ctx context.Context, sett
return
}

if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat) {
interval := time.Duration(settings.Opamp.HeartbeatIntervalSeconds) * time.Second
if err := r.sender.SetHeartbeatInterval(interval); err != nil {
r.logger.Errorf(ctx, "Failed to set heartbeat interval: %v", err)
}
}

if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings) {
err := r.callbacks.OnOpampConnectionSettings(ctx, settings.Opamp)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"errors"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -22,6 +23,9 @@ type Sender interface {

// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
SetInstanceUid(instanceUid types.InstanceUid) error

// SetHeartbeatInterval sets the interval for the agent heartbeats.
SetHeartbeatInterval(duration time.Duration) error
}

// SenderCommon is partial Sender implementation that is common between WebSocket and plain
Expand Down
62 changes: 58 additions & 4 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package internal

import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand All @@ -13,7 +15,8 @@ import (
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalSeconds = 30
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
Expand All @@ -25,15 +28,24 @@ type WSSender struct {
// Indicates that the sender has fully stopped.
stopped chan struct{}
err error

heartbeatIntervalUpdated chan struct{}
heartbeatIntervalSeconds atomic.Int64
heartbeatTimer *time.Timer
}

// NewSender creates a new Sender that uses WebSocket to send
// messages to the server.
func NewSender(logger types.Logger) *WSSender {
return &WSSender{
logger: logger,
SenderCommon: NewSenderCommon(),
s := &WSSender{
logger: logger,
heartbeatIntervalUpdated: make(chan struct{}, 1),
heartbeatTimer: time.NewTimer(0),
SenderCommon: NewSenderCommon(),
}
s.heartbeatIntervalSeconds.Store(defaultHeartbeatIntervalSeconds)

return s
}

// Start the sender and send the first message that was set via NextMessage().Update()
Expand Down Expand Up @@ -62,10 +74,51 @@ func (s *WSSender) StoppingErr() error {
return s.err
}

// SetHeartbeatInterval sets the heartbeat interval and triggers timer reset.
func (s *WSSender) SetHeartbeatInterval(d time.Duration) error {
if d < 0 {
return errors.New("heartbeat interval for wsclient must be non-negative")
}

s.heartbeatIntervalSeconds.Store(int64(d.Seconds()))
select {
case s.heartbeatIntervalUpdated <- struct{}{}:
default:
}
return nil
}

func (s *WSSender) shouldSendHeartbeat() <-chan time.Time {
t := s.heartbeatTimer

// Before Go 1.23, the only safe way to use Reset was to [Stop] and
// explicitly drain the timer first.
// ref: https://pkg.go.dev/time#Timer.Reset
if !t.Stop() {
select {
case <-t.C:
default:
}
}

if d := time.Duration(s.heartbeatIntervalSeconds.Load()) * time.Second; d != 0 {
t.Reset(d)
return t.C
}

// Heartbeat interval is set to Zero, disable heartbeat.
return nil
}

func (s *WSSender) run(ctx context.Context) {
out:
for {
select {
case <-s.shouldSendHeartbeat():
s.NextMessage().Update(func(msg *protobufs.AgentToServer) {})
s.ScheduleSend()
case <-s.heartbeatIntervalUpdated:
// trigger heartbeat timer reset
case <-s.hasPendingMessage:
s.sendNextMessage(ctx)

Expand All @@ -77,6 +130,7 @@ out:
}
}

s.heartbeatTimer.Stop()
close(s.stopped)
}

Expand Down
28 changes: 28 additions & 0 deletions client/internal/wssender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package internal

import (
"testing"
"time"

sharedinternal "github.com/open-telemetry/opamp-go/internal"
"github.com/stretchr/testify/assert"
)

func TestWSSenderSetHeartbeatInterval(t *testing.T) {
sender := NewSender(&sharedinternal.NopLogger{})

// Default interval should be 30s as per OpAMP Specification
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())

// negative interval is invalid for http sender
assert.Error(t, sender.SetHeartbeatInterval(-1))
assert.Equal(t, int64((30 * time.Second).Seconds()), sender.heartbeatIntervalSeconds.Load())

// zero is valid for ws sender
assert.NoError(t, sender.SetHeartbeatInterval(0))
assert.Equal(t, int64(0), sender.heartbeatIntervalSeconds.Load())

var expected int64 = 10
assert.NoError(t, sender.SetHeartbeatInterval(time.Duration(expected)*time.Second))
assert.Equal(t, expected, sender.heartbeatIntervalSeconds.Load())
}
71 changes: 71 additions & 0 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,77 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

func TestWSSenderReportsHeartbeat(t *testing.T) {
tests := []struct {
name string
clientEnableHeartbeat bool
serverEnableHeartbeat bool
expectHeartbeats bool
}{
{"enable heartbeat", true, true, true},
{"client disable heartbeat", false, true, false},
{"server disable heartbeat", true, false, false},
}

for _, tt := range tests {
srv := internal.StartMockServer(t)

var firstMsg atomic.Bool
var conn atomic.Value
srv.OnWSConnect = func(c *websocket.Conn) {
conn.Store(c)
firstMsg.Store(true)
}
var msgCount atomic.Int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if firstMsg.Load() {
firstMsg.Store(false)
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: &protobufs.OpAMPConnectionSettings{
HeartbeatIntervalSeconds: 1,
},
},
}
if !tt.serverEnableHeartbeat {
resp.ConnectionSettings.Opamp.HeartbeatIntervalSeconds = 0
}
return resp
}
msgCount.Add(1)
return nil
}

// Start an OpAMP/WebSocket client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
}
if tt.clientEnableHeartbeat {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}
client := NewWebSocket(nil)
startClient(t, settings, client)

// Wait for connection to be established.
eventually(t, func() bool { return conn.Load() != nil })

if tt.expectHeartbeats {
assert.Eventually(t, func() bool {
return msgCount.Load() >= 2
}, 3*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool {
return msgCount.Load() >= 2
}, 3*time.Second, 10*time.Millisecond)
}

// Stop the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
}
}

func TestDisconnectWSByServer(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
Expand Down
6 changes: 3 additions & 3 deletions internal/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func CreateTLSCert(caCertPath, caKeyPath string) (*protobufs.TLSCertificate, err

// We have a client certificate with a public and private key.
certificate := &protobufs.TLSCertificate{
PublicKey: publicKeyPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaPublicKey: caCertBytes,
Cert: publicKeyPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaCert: caCertBytes,
}

return certificate, nil
Expand Down
8 changes: 4 additions & 4 deletions internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,15 +536,15 @@ func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) (
// Client-initiated CSR flow. This is currently initiated when connecting
// to the Server for the first time (see requestClientCertificate()).
cert, err = tls.X509KeyPair(
certificate.PublicKey, // We received the certificate from the Server.
certificate.Cert, // We received the certificate from the Server.
agent.clientPrivateKeyPEM, // Private key was earlier locally generated.
)
} else {
// Server-initiated flow. This is currently initiated by user clicking a button in
// the Server UI.
// Both certificate and private key are from the Server.
cert, err = tls.X509KeyPair(
certificate.PublicKey,
certificate.Cert,
certificate.PrivateKey,
)
}
Expand All @@ -554,8 +554,8 @@ func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) (
return nil, err
}

if len(certificate.CaPublicKey) != 0 {
caCertPB, _ := pem.Decode(certificate.CaPublicKey)
if len(certificate.CaCert) != 0 {
caCertPB, _ := pem.Decode(certificate.CaCert)
caCert, err := x509.ParseCertificate(caCertPB.Bytes)
if err != nil {
agent.logger.Errorf(context.Background(), "Cannot parse CA cert: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions internal/examples/server/certman/certman.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func CreateClientTLSCertFromCSR(csr *x509.CertificateRequest) (*protobufs.TLSCer

// We have a client certificate with a public and private key.
certificate := &protobufs.TLSCertificate{
PublicKey: certPEM.Bytes(),
CaPublicKey: caCertBytes,
Cert: certPEM.Bytes(),
CaCert: caCertBytes,
}

return certificate, nil
Expand Down Expand Up @@ -144,9 +144,9 @@ func CreateClientTLSCert() (*protobufs.TLSCertificate, error) {

// We have a client certificate with a public and private key.
certificate := &protobufs.TLSCertificate{
PublicKey: certPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaPublicKey: caCertBytes,
Cert: certPEM.Bytes(),
PrivateKey: privateKeyPEM.Bytes(),
CaCert: caCertBytes,
}

return certificate, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/opamp-spec
Loading

0 comments on commit 7cdd395

Please sign in to comment.