Skip to content

Commit

Permalink
rework proxy, headers emulation
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Nov 12, 2024
1 parent 50ff1e0 commit dba55cf
Show file tree
Hide file tree
Showing 39 changed files with 621 additions and 448 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.23.0
require (
github.com/FZambia/eagle v0.1.0
github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b
github.com/centrifugal/centrifuge v0.33.5-0.20241104073442-b695b2eb669d
github.com/centrifugal/protocol v0.13.5-0.20241030080628-ab8a125839c1
github.com/centrifugal/centrifuge v0.33.5-0.20241111162802-ddd7cc1e7267
github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e
github.com/cristalhq/jwt/v5 v5.4.0
github.com/go-viper/mapstructure/v2 v2.1.0
github.com/gobwas/glob v0.2.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/centrifugal/centrifuge v0.33.5-0.20241104073442-b695b2eb669d h1:k53DsmeNfhw7KbM+T8qeIOL23vI3ewLK1vmv+IYb04U=
github.com/centrifugal/centrifuge v0.33.5-0.20241104073442-b695b2eb669d/go.mod h1:yvzNn5hq/bFBpoXQwM8HbU481pAXQkyP2tzvJgFsiN8=
github.com/centrifugal/protocol v0.13.5-0.20241030080628-ab8a125839c1 h1:itkwGpNchf1X8djN+S90koPptcnxzRGh2atzdw6ZZtc=
github.com/centrifugal/protocol v0.13.5-0.20241030080628-ab8a125839c1/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/centrifugal/centrifuge v0.33.5-0.20241111162802-ddd7cc1e7267 h1:qCDl370NqiN1XsVnpaHnnH3ixLM0oNBujHIphoI/4/Q=
github.com/centrifugal/centrifuge v0.33.5-0.20241111162802-ddd7cc1e7267/go.mod h1:enLQkNNo05bv/a2fKWHS2IyhrE91TQJghpygTKtqfmM=
github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e h1:+GbuEwJybDuHz6e8S17t/f0I4aTDnZjk37c0aGNFbwc=
github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down
30 changes: 19 additions & 11 deletions internal/client/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"unicode"

"github.com/centrifugal/centrifugo/v5/internal/clientcontext"
"github.com/centrifugal/centrifugo/v5/internal/clientstorage"
"github.com/centrifugal/centrifugo/v5/internal/config"
"github.com/centrifugal/centrifugo/v5/internal/configtypes"
Expand All @@ -31,7 +32,7 @@ type ProxyMap struct {
SubscribeStreamProxies map[string]*proxy.SubscribeStreamProxy
}

// Handler ...
// Handler for client connections.
type Handler struct {
node *centrifuge.Node
cfgContainer *config.Container
Expand All @@ -41,7 +42,7 @@ type Handler struct {
rpcExtension map[string]RPCExtensionFunc
}

// NewHandler ...
// NewHandler creates new Handler.
func NewHandler(
node *centrifuge.Node,
cfgContainer *config.Container,
Expand Down Expand Up @@ -337,7 +338,7 @@ func (h *Handler) OnClientConnecting(

processClientChannels = true
} else if connectProxyHandler != nil {
connectReply, _, err := connectProxyHandler(ctx, e)
connectReply, _, err := connectProxyHandler(clientcontext.SetEmulatedHeadersToContext(ctx, e.Headers), e)
if err != nil {
return centrifuge.ConnectReply{}, err
}
Expand Down Expand Up @@ -414,7 +415,7 @@ func (h *Handler) OnClientConnecting(
return centrifuge.ConnectReply{}, centrifuge.ErrorInternal
}

if !isPrivate && chOpts.SubscribeProxyName == "" && validChannelName {
if !isPrivate && !chOpts.SubscribeProxyEnabled && validChannelName {
if isUserLimited && chOpts.UserLimitedChannels && (userID != "" && h.cfgContainer.UserAllowed(ch, userID)) {
channelOk = true
} else if chOpts.SubscribeForClient && (userID != "" || chOpts.SubscribeForAnonymous) {
Expand Down Expand Up @@ -450,6 +451,13 @@ func (h *Handler) OnClientConnecting(
Data: data,
ClientSideRefresh: !refreshProxyEnabled,
}
if len(e.Headers) > 0 {
if newCtx != nil {
newCtx = clientcontext.SetEmulatedHeadersToContext(newCtx, e.Headers)
} else {
newCtx = clientcontext.SetEmulatedHeadersToContext(ctx, e.Headers)
}
}
if newCtx != nil {
finalReply.Context = newCtx
}
Expand Down Expand Up @@ -695,17 +703,17 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr
} else if isUserLimitedChannel && h.cfgContainer.UserAllowed(e.Channel, c.UserID()) {
allowed = true
options.Source = subsource.UserLimited
} else if (chOpts.SubscribeProxyName != "") && !isUserLimitedChannel {
} else if (chOpts.SubscribeProxyEnabled) && !isUserLimitedChannel {
if subscribeProxyHandler == nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()}))
return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable
}
r, _, err := subscribeProxyHandler(c, e, chOpts, getPerCallData(c))
if chOpts.SubRefreshProxyName != "" {
if chOpts.SubRefreshProxyEnabled {
r.ClientSideRefresh = false
}
return r, SubscribeExtra{}, err
} else if (chOpts.SubscribeStreamProxyName != "") && !isUserLimitedChannel {
} else if (chOpts.SubscribeStreamProxyEnabled) && !isUserLimitedChannel {
if subscribeStreamHandlerFunc == nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "stream proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()}))
return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable
Expand All @@ -717,7 +725,7 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr
storage["stream_publisher_"+e.Channel] = publishFunc
release(storage)
}
if chOpts.SubRefreshProxyName != "" {
if chOpts.SubRefreshProxyEnabled {
r.ClientSideRefresh = false
}
return r, SubscribeExtra{}, err
Expand Down Expand Up @@ -748,7 +756,7 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr

return centrifuge.SubscribeReply{
Options: options,
ClientSideRefresh: chOpts.SubRefreshProxyName == "",
ClientSideRefresh: !chOpts.SubRefreshProxyEnabled,
}, SubscribeExtra{}, nil
}

Expand All @@ -771,13 +779,13 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan

var allowed bool

if chOpts.PublishProxyName != "" {
if chOpts.PublishProxyEnabled {
if publishProxyHandler == nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "publish proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()}))
return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable
}
return publishProxyHandler(c, e, chOpts, getPerCallData(c))
} else if chOpts.SubscribeStreamProxyName != "" {
} else if chOpts.SubscribeStreamProxyEnabled {
if !chOpts.SubscribeStreamBidirectional {
return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable
}
Expand Down
52 changes: 50 additions & 2 deletions internal/client/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,9 @@ func TestClientOnSubscribe_UserLimitedChannelDoesNotCallProxy(t *testing.T) {
defer func() { _ = node.Shutdown(context.Background()) }()

cfg := config.DefaultConfig()
cfg.UnifiedProxy.SubscribeEndpoint = "http://localhost:8080"
cfg.Channel.Proxy.Subscribe.Endpoint = "http://localhost:8080"
cfg.Channel.WithoutNamespace.UserLimitedChannels = true
cfg.Channel.WithoutNamespace.SubscribeProxyName = "unified"
cfg.Channel.WithoutNamespace.SubscribeProxyEnabled = true
cfgContainer, err := config.NewContainer(cfg)
require.NoError(t, err)

Expand Down Expand Up @@ -1147,3 +1147,51 @@ func TestClientOnSubscribe_UserLimitedChannelNotAllowedForAnotherUser(t *testing
}, proxyFunc, nil)
require.ErrorIs(t, err, centrifuge.ErrorPermissionDenied)
}

func TestClientOnSubscribe_SubRefreshProxy(t *testing.T) {
node := tools.NodeWithMemoryEngineNoHandlers()
defer func() { _ = node.Shutdown(context.Background()) }()

cfg := config.DefaultConfig()
cfg.Channel.Proxy.Subscribe.Endpoint = "http://localhost:8080"
cfg.Channel.WithoutNamespace.SubscribeProxyEnabled = true
cfgContainer, err := config.NewContainer(cfg)
require.NoError(t, err)

h := NewHandler(node, cfgContainer, nil, nil, &ProxyMap{})

numProxyCalls := 0

proxyFunc := func(c proxy.Client, e centrifuge.SubscribeEvent, chOpts configtypes.ChannelOptions, pcd proxy.PerCallData) (centrifuge.SubscribeReply, proxy.SubscribeExtra, error) {
numProxyCalls++
return centrifuge.SubscribeReply{
ClientSideRefresh: !chOpts.SubRefreshProxyEnabled,
}, proxy.SubscribeExtra{}, nil
}

reply, _, err := h.OnSubscribe(&tools.TestClientMock{
UserIDFunc: func() string {
return "42"
},
}, centrifuge.SubscribeEvent{
Channel: "user",
}, proxyFunc, nil)
require.NoError(t, err)
require.Equal(t, 1, numProxyCalls)
require.True(t, reply.ClientSideRefresh)

cfg.Channel.WithoutNamespace.SubRefreshProxyEnabled = true
cfg.Channel.Proxy.SubRefresh.Endpoint = "https://example.com"
err = cfgContainer.Reload(cfg)
require.NoError(t, err)
reply, _, err = h.OnSubscribe(&tools.TestClientMock{
UserIDFunc: func() string {
return "42"
},
}, centrifuge.SubscribeEvent{
Channel: "user",
}, proxyFunc, nil)
require.NoError(t, err)
require.Equal(t, 2, numProxyCalls)
require.False(t, reply.ClientSideRefresh)
}
24 changes: 24 additions & 0 deletions internal/clientcontext/emulated_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package clientcontext

import (
"context"
)

type emulatedHeadersKey struct{}

// GetEmulatedHeadersFromContext returns emulated headers from context.
func GetEmulatedHeadersFromContext(ctx context.Context) (map[string]string, bool) {
if val := ctx.Value(emulatedHeadersKey{}); val != nil {
values, ok := val.(map[string]string)
return values, ok
}
return nil, false
}

// SetEmulatedHeadersToContext sets header map to context.
func SetEmulatedHeadersToContext(ctx context.Context, h map[string]string) context.Context {
if len(h) == 0 {
return ctx
}
return context.WithValue(ctx, emulatedHeadersKey{}, h)
}
19 changes: 4 additions & 15 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,15 @@ type Config struct {
Channel configtypes.Channel `mapstructure:"channel" json:"channel" envconfig:"channel" toml:"channel" yaml:"channel"`
// RPC is a configuration for client RPC calls.
RPC configtypes.RPC `mapstructure:"rpc" json:"rpc" envconfig:"rpc" toml:"rpc" yaml:"rpc"`
// Proxies is an array of proxies with custom names for the more granular control of channel-related events
// in different channel namespaces.
Proxies configtypes.NamedProxies `mapstructure:"proxies" default:"[]" json:"proxies" envconfig:"proxies" yaml:"proxies" toml:"proxies"`

// HttpAPI is a configuration for HTTP server API. It's enabled by default.
HttpAPI configtypes.HttpAPI `mapstructure:"http_api" json:"http_api" envconfig:"http_api" toml:"http_api" yaml:"http_api"`
// GrpcAPI is a configuration for gRPC server API. It's disabled by default.
GrpcAPI configtypes.GrpcAPI `mapstructure:"grpc_api" json:"grpc_api" envconfig:"grpc_api" toml:"grpc_api" yaml:"grpc_api"`

// UnifiedProxy is a helper configuration for events proxy. It can be referenced using UnifiedProxyName name.
UnifiedProxy configtypes.UnifiedProxy `mapstructure:"unified_proxy" json:"unified_proxy" envconfig:"unified_proxy" toml:"unified_proxy" yaml:"unified_proxy"`
// Proxies is a configuration for granular events proxies. See also UnifiedProxy.
Proxies configtypes.Proxies `mapstructure:"proxies" default:"[]" json:"proxies" envconfig:"proxies" toml:"proxies" yaml:"proxies"`

// Consumers is a configuration for message queue consumers. For example, Centrifugo can consume
// messages from PostgreSQL transactional outbox table, or from Kafka topics.
Consumers configtypes.Consumers `mapstructure:"consumers" default:"[]" json:"consumers" envconfig:"consumers" toml:"consumers" yaml:"consumers"`
Expand Down Expand Up @@ -242,7 +240,7 @@ func GetConfig(cmd *cobra.Command, configFile string) (Config, Meta, error) {
for i, item := range conf.Proxies {
varInfo, err = envconfig.Process("CENTRIFUGO_PROXIES_"+item.Name, &item)
if err != nil {
return Config{}, Meta{}, fmt.Errorf("error processing env proxies: %w", err)
return Config{}, Meta{}, fmt.Errorf("error processing env named proxies: %w", err)
}
conf.Proxies[i] = item
extendKnownEnvVars(knownEnvVars, varInfo)
Expand All @@ -257,15 +255,6 @@ func GetConfig(cmd *cobra.Command, configFile string) (Config, Meta, error) {
extendKnownEnvVars(knownEnvVars, varInfo)
}

for i, header := range conf.UnifiedProxy.HttpHeaders {
conf.UnifiedProxy.HttpHeaders[i] = strings.ToLower(header)
}
for i, proxy := range conf.Proxies {
for j, header := range proxy.HttpHeaders {
conf.Proxies[i].HttpHeaders[j] = strings.ToLower(header)
}
}

meta.UnknownKeys = findUnknownKeys(v.AllSettings(), conf, "")
meta.UnknownEnvs = checkEnvironmentVars(knownEnvVars)
meta.KnownEnvVars = knownEnvVars
Expand Down
6 changes: 3 additions & 3 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestConfigEnvVars(t *testing.T) {
_ = os.Setenv("CENTRIFUGO_CONSUMERS_KAFKA_KAFKA_TLS_ENABLED", "false")
_ = os.Setenv("CENTRIFUGO_UNKNOWN_ENV", "1")
_ = os.Setenv("CENTRIFUGO_CHANNEL_NAMESPACES", `[{"name": "env"}]`)
_ = os.Setenv("CENTRIFUGO_UNIFIED_PROXY_HTTP_STATIC_HEADERS", `{"key": "value"}`)
_ = os.Setenv("CENTRIFUGO_CLIENT_PROXY_CONNECT_HTTP_STATIC_HEADERS", `{"key": "value"}`)
_ = os.Setenv("CENTRIFUGO_WEBSOCKET_WRITE_TIMEOUT", `300ms`)
_ = os.Setenv("CENTRIFUGO_PROXIES", `[]`)
defer func() {
Expand All @@ -67,7 +67,7 @@ func TestConfigEnvVars(t *testing.T) {
_ = os.Unsetenv("CENTRIFUGO_CLIENT_ALLOWED_ORIGINS")
_ = os.Unsetenv("CENTRIFUGO_CLIENT_TOKEN_JWKS_PUBLIC_ENDPOINT")
_ = os.Unsetenv("CENTRIFUGO_CHANNEL_NAMESPACES")
_ = os.Unsetenv("CENTRIFUGO_UNIFIED_PROXY_HTTP_STATIC_HEADERS")
_ = os.Unsetenv("CENTRIFUGO_CLIENT_PROXY_CONNECT_HTTP_STATIC_HEADERS")
_ = os.Unsetenv("CENTRIFUGO_WEBSOCKET_WRITE_TIMEOUT")
_ = os.Unsetenv("CENTRIFUGO_PROXIES")
}()
Expand All @@ -81,7 +81,7 @@ func TestConfigEnvVars(t *testing.T) {
require.Len(t, meta.UnknownEnvs, 1)
require.Len(t, meta.UnknownKeys, 0)
require.Contains(t, meta.UnknownEnvs, "CENTRIFUGO_UNKNOWN_ENV")
require.Equal(t, configtypes.MapStringString(map[string]string{"key": "value"}), conf.UnifiedProxy.HTTP.StaticHeaders)
require.Equal(t, configtypes.MapStringString(map[string]string{"key": "value"}), conf.Client.Proxy.Connect.HTTP.StaticHeaders)
require.Equal(t, configtypes.Duration(300*time.Millisecond), conf.WebSocket.WriteTimeout)
require.Len(t, conf.Proxies, 0)
}
2 changes: 1 addition & 1 deletion internal/config/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package config

const (
UnifiedProxyName = "unified"
DefaultProxyName = "default"
TransportErrorMode = "transport"
)
18 changes: 10 additions & 8 deletions internal/config/testdata/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@
"token": {
"jwks_public_endpoint": "https://example.com/jwks"
},
"ping_interval": "12s"
"ping_interval": "12s",
"proxy": {
"connect": {
"http": {
"static_headers": {
"x": "y"
}
}
}
}
},
"channel": {
"without_namespace": {
Expand Down Expand Up @@ -45,13 +54,6 @@
}
}
],
"unified_proxy": {
"http": {
"static_headers": {
"x": "y"
}
}
},
"websocket": {
"write_timeout": "2s"
}
Expand Down
Loading

0 comments on commit dba55cf

Please sign in to comment.