Skip to content

Commit

Permalink
Merge #140456
Browse files Browse the repository at this point in the history
140456: changefeedccl: fix proprietary OAuth SASL mechanism registration r=wenyihu6 a=asg0451

Fix proprietary oauth sasl not being registered due to bad option validation.

Epic: none
Release note: None


Co-authored-by: Miles Frankel <[email protected]>
  • Loading branch information
craig[bot] and asg0451 committed Feb 4, 2025
2 parents 29a190a + 23b581b commit e8b932d
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kafkaauth/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
srcs = ["sasl_proprietary_oauth_test.go"],
embed = [":kafkaauth"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_stretchr_testify//assert",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kafkaauth/kafkaauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r saslMechanismRegistry) pick(u *changefeedbase.SinkURL) (_ SASLMechanism,
}

// Return slightly nicer errors for this common case.
if b.name() != sarama.SASLTypeOAuth {
if b.name() != sarama.SASLTypeOAuth && b.name() != proprietaryOAuthName {
if err := validateNoOAuthOnlyParams(u); err != nil {
return nil, false, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/kafkaauth/sasl_proprietary_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"golang.org/x/oauth2"
)

const proprietaryOAuthName = "PROPRIETARY_OAUTH"

type saslProprietaryOAuthBuilder struct{}

// name implements authMechanismBuilder.
func (s saslProprietaryOAuthBuilder) name() string {
return "PROPRIETARY_OAUTH"
return proprietaryOAuthName
}

// validateParams implements authMechanismBuilder.
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/changefeedccl/kafkaauth/sasl_proprietary_oauth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -82,3 +83,24 @@ func TestProprietaryTokenSource(t *testing.T) {
assert.Equal(t, tokResp.AccessToken, tok.AccessToken)
assert.WithinRange(t, tok.Expiry, start.Add(3600*time.Second), start.Add(3700*time.Second))
}

func TestProprietaryOAuthRegistration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

u, err := url.Parse(`kafka://idk?sasl_enabled=true&sasl_mechanism=PROPRIETARY_OAUTH&sasl_client_id=cl&sasl_token_url=localhost&sasl_proprietary_resource=r&sasl_proprietary_client_assertion_type=at&sasl_proprietary_client_assertion=as`)
require.NoError(t, err)
su := &changefeedbase.SinkURL{URL: u}
mech, ok, err := Pick(su)
require.NoError(t, err)
require.True(t, ok)
require.NotNil(t, mech)
om, ok := mech.(*saslProprietaryOAuth)
require.True(t, ok)
require.Empty(t, su.RemainingQueryParams())
require.Equal(t, "cl", om.clientID)
require.Equal(t, "localhost", om.tokenURL)
require.Equal(t, "r", om.resource)
require.Equal(t, "at", om.clientAssertionType)
require.Equal(t, "as", om.clientAssertion)
}

0 comments on commit e8b932d

Please sign in to comment.