Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Feature/amqp #7

Draft
wants to merge 48 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f3d2f8a
Added support for oauth2 tokens in opensearch output sink
cthtrifork Feb 29, 2024
86a1304
Added direct support for oauth2 tokens in kafka using franz-lib
cthtrifork Feb 29, 2024
8f95960
updated docs
cthtrifork Feb 29, 2024
d33a1e6
Typo
cthtrifork Feb 29, 2024
02c198e
Fix nullpointer issue
cthtrifork Feb 29, 2024
6eccd2b
mark static access token as secret
cthtrifork Feb 29, 2024
670ff21
mark static token as secret
cthtrifork Feb 29, 2024
360bf95
did not check if token is empty and not just non-existing
cthtrifork Mar 1, 2024
8fef0af
Merge branch 'feature/kafka-oauth2endpoint-support' into feature/oauth2
cthtrifork Mar 1, 2024
7b940cd
forgot to describe oauth2.enabled
cthtrifork Mar 3, 2024
d49ccef
Merge branch 'feature/opensearch-oauth2-token-support' into feature/o…
cthtrifork Mar 3, 2024
d8d1651
devcontainer support
cthtrifork Mar 4, 2024
c54fa47
oauth2 support with clientcredentials
cthtrifork Apr 10, 2024
83ae739
Added support for oauth2 tokens in opensearch output sink
cthtrifork Feb 29, 2024
87b6b19
Typo
cthtrifork Feb 29, 2024
f05f6b0
Fix nullpointer issue
cthtrifork Feb 29, 2024
719301f
mark static access token as secret
cthtrifork Feb 29, 2024
321a030
Added direct support for oauth2 tokens in kafka using franz-lib
cthtrifork Feb 29, 2024
0ca6c12
updated docs
cthtrifork Feb 29, 2024
8b98e25
mark static token as secret
cthtrifork Feb 29, 2024
1a0d3b0
did not check if token is empty and not just non-existing
cthtrifork Mar 1, 2024
21df213
forgot to describe oauth2.enabled
cthtrifork Mar 3, 2024
bee5c53
devcontainer support
cthtrifork Mar 4, 2024
c8b730b
Merge branch 'feature/oauth2-opensearch-direct' into cheetah-main
cthtrifork Apr 10, 2024
ddfd5d6
prepare release flow
cthtrifork Apr 10, 2024
0fbb75e
try fix workflow
cthtrifork Apr 10, 2024
2362090
fixes
cthtrifork Apr 10, 2024
db6e4dc
fix IMAGE_NAME
cthtrifork Apr 10, 2024
7bc1073
go mod tidy
cthtrifork Apr 10, 2024
c25a321
GOLANGCI_LINT_VERSION="1.57.1"
cthtrifork Apr 10, 2024
7743a61
linting
cthtrifork Apr 10, 2024
abdaa53
increase timeout
cthtrifork Apr 10, 2024
27ea171
initial work with GET support
cthtrifork Apr 16, 2024
d4c5610
docs
cthtrifork Apr 16, 2024
0c4342b
wrote down some thoughts
cthtrifork Apr 16, 2024
f7a5a35
rename collection to index
cthtrifork Apr 16, 2024
442cbb5
move stuff around
cthtrifork Apr 17, 2024
9548fbc
fix compilation issues
cthtrifork Apr 17, 2024
04cf0c8
made the cache more flexible
cthtrifork Apr 17, 2024
4b1cb0b
supported all operators
cthtrifork Apr 17, 2024
2beb25b
simplify release flow
cthtrifork Apr 17, 2024
c38417f
remove tmp secret
cthtrifork Apr 17, 2024
ddeba9e
MarshalJSON
cthtrifork Apr 17, 2024
c4714dd
Merge pull request #4 from trifork/feature/opensearch-cache
cthtrifork Apr 17, 2024
2f4c757
added opensearch ping
cthtrifork Apr 18, 2024
fa70b9c
tested multilevel better
cthtrifork Apr 18, 2024
a7437c6
Merge pull request #5 from trifork/feature/opensearch-cache
cthtrifork Apr 18, 2024
c5e4367
save work
cthtrifork May 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM mcr.microsoft.com/vscode/devcontainers/go:1.21

ARG NODE_VERSION="16"
ARG GOLANGCI_LINT_VERSION="1.57.1"

# https://github.com/microsoft/vscode-dev-containers/blob/main/containers/go/.devcontainer/base.Dockerfile
ENV USERNAME=vscode
ENV LIBRARY_SCRIPTS_SRC="https://raw.githubusercontent.com/microsoft/vscode-dev-containers/main/containers/go/.devcontainer/library-scripts/node-debian.sh"
ENV NVM_DIR=/usr/local/share/nvm
ENV NVM_SYMLINK_CURRENT=true \
PATH=${NVM_DIR}/current/bin:${PATH}
RUN mkdir /tmp/library-scripts \
&& curl -fsSL -o /tmp/library-scripts/node-debian.sh "${LIBRARY_SCRIPTS_SRC}"
RUN bash /tmp/library-scripts/node-debian.sh "${NVM_DIR}" "${NODE_VERSION}" "${USERNAME}" \
&& apt-get clean -y && rm -rf /var/lib/apt/lists/* \
&& rm -rf /tmp/library-scripts

RUN echo 'deb [trusted=yes] https://repo.goreleaser.com/apt/ /' \
| tee /etc/apt/sources.list.d/goreleaser.list
RUN apt-get update \
&& export DEBIAN_FRONTEND=noninteractive \
&& apt-get -y install --no-install-recommends goreleaser

RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh \
| sh -s -- -b $(go env GOPATH)/bin v$GOLANGCI_LINT_VERSION

USER vscode
WORKDIR /home/vscode

RUN mkdir -p .config/git \
&& echo ".vscode/*" >> .config/git/ignore \
&& echo "*.code-workspace" >> .config/git/ignore \
&& echo ".history/" >> .config/git/ignore
33 changes: 33 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "Benthos Dev Container",
"build": {
"dockerfile": "Dockerfile",
"args": {
"NODE_VERSION": "16",
"GOLANGCI_LINT_VERSION": "1.46.2"
}
},
"runArgs": ["--network=host", "--privileged"],
"customizations": {
// Configure properties specific to VS Code.
"vscode": {
"extensions": [
"golang.Go",
"dbaeumer.vscode-eslint",
"EditorConfig.EditorConfig",
"esbenp.prettier-vscode",
"github.vscode-github-actions",
"jebbs.plantuml",
"GitHub.copilot",
"github.vscode-github-actions",
"ms-vscode.makefile-tools",
"GeorgesHaidar.vsc-benthos"
]
}
},
"features": {
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
},
"postCreateCommand": "go mod download",
"remoteUser": "vscode"
}
41 changes: 41 additions & 0 deletions .github/workflows/cheetah_release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Release cheetah-benthos

on:
workflow_dispatch:

push:
branches: ["cheetah-main"]
tags:
- "v*"
pull_request:
branches: ["cheetah-main"]

env:
IMAGE_NAME: ${{ github.repository }}
DOCKERFILE_PATH: ./resources/docker/Dockerfile
CONTEXT: ./

permissions:
contents: read
packages: write

jobs:
push_image:
name: "Build and push image"
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- name: Checkout repository
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4

- name: Build and push
uses: trifork/cheetah-infrastructure-utils-workflows/.github/actions/build-image/default@main
with:
read_package_pat: ${{ secrets.PACKAGE_PAT }} # we need this, as GITHUB_TOKEN only have permission to its own repo
context: ${{ env.CONTEXT }}
image_name: ${{ env.IMAGE_NAME }}
github_run_id: ${{ github.run_id }}
dockerfile_path: ${{ env.DOCKERFILE_PATH }}
push_image: ${{ (github.event_name == 'pull_request' || github.ref_type == 'tag' || github.event_name == 'workflow_dispatch') && 'true' || 'false' }}
upload_image: "false"
120 changes: 108 additions & 12 deletions internal/impl/kafka/sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package kafka

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/IBM/sarama"

Expand Down Expand Up @@ -43,6 +49,18 @@ func saslField() *service.ConfigField {
service.NewStringField("token").
Description("The token to use for a single session's OAUTHBEARER authentication.").
Default(""),
service.NewStringField("tokenEndpoint").
Description("The endpoint to use for OAUTHBEARER token acquisition.").
Default(""),
service.NewStringField("clientId").
Description("The client ID to use for OAUTHBEARER token acquisition.").
Default(""),
service.NewStringField("clientSecret").
Description("The client secret to use for OAUTHBEARER token acquisition.").
Default("").Secret(),
service.NewStringField("scope").
Description("The scope to use for OAUTHBEARER token acquisition.").
Default(""),
service.NewStringMapField("extensions").
Description("Key/value pairs to add to OAUTHBEARER authentication requests.").
Optional(),
Expand Down Expand Up @@ -130,21 +148,98 @@ func plainSaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) {

func oauthSaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) {
token, err := c.FieldString("token")

if err != nil && token != "" {
var extensions map[string]string
if c.Contains("extensions") {
if extensions, err = c.FieldStringMap("extensions"); err != nil {
return nil, err
}
}
return oauth.Oauth(func(c context.Context) (oauth.Auth, error) {
return oauth.Auth{
Token: token,
Extensions: extensions,
}, nil
}), nil
} else if c.Contains("tokenEndpoint") {
return oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) {
shortToken, err := acquireToken(ctx, c)
return oauth.Auth{Token: shortToken}, err
}), nil
}
return nil, errors.New("field 'token' or 'tokenEndpoint' was not found in the config")
}

func acquireToken(ctx context.Context, c *service.ParsedConfig) (string, error) {

tokenEndpoint, err := c.FieldString("tokenEndpoint")
if err != nil {
return nil, err
return "", err
}
var extensions map[string]string
if c.Contains("extensions") {
if extensions, err = c.FieldStringMap("extensions"); err != nil {
return nil, err
}

clientID, err := c.FieldString("clientId")
if err != nil {
return "", err
}
return oauth.Oauth(func(c context.Context) (oauth.Auth, error) {
return oauth.Auth{
Token: token,
Extensions: extensions,
}, nil
}), nil

clientSecret, err := c.FieldString("clientSecret")
if err != nil {
return "", err
}

scope, err := c.FieldString("scope")
if err != nil {
return "", err
}

authHeaderValue := base64.StdEncoding.EncodeToString([]byte(clientID + ":" + clientSecret))

queryParams := url.Values{}
queryParams.Set("grant_type", "client_credentials")
queryParams.Set("scope", scope)

req, err := http.NewRequestWithContext(ctx, "POST", tokenEndpoint, strings.NewReader(queryParams.Encode()))
if err != nil {
return "", err
}

req.URL.RawQuery = queryParams.Encode()

req.Header.Set("Authorization", "Basic "+authHeaderValue)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", err
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

if err := resp.Body.Close(); err != nil {
return "", err
}

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode)
}

var tokenResponse map[string]interface{}
err = json.Unmarshal(body, &tokenResponse)
if err != nil {
return "", fmt.Errorf("failed to parse token response: %s", err)
}

accessToken, ok := tokenResponse["access_token"].(string)
if !ok {
return "", errors.New("access_token not found in token response")
}

return accessToken, nil
}

func scram256SaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) {
Expand Down Expand Up @@ -223,6 +318,7 @@ func SaramaSASLField() *service.ConfigField {
Secret(),
service.NewStringField(saramaFieldSASLAccessToken).
Description("A static OAUTHBEARER access token").
Secret().
Default(""),
service.NewStringField(saramaFieldSASLTokenCache).
Description("Instead of using a static `access_token` allows you to query a [`cache`](/docs/components/caches/about) resource to fetch OAUTHBEARER tokens from").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/sasl/sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func FieldSpec() docs.FieldSpec {
),
docs.FieldString("user", "A PLAIN username. It is recommended that you use environment variables to populate this field.", "${USER}"),
docs.FieldString("password", "A PLAIN password. It is recommended that you use environment variables to populate this field.", "${PASSWORD}").Secret(),
docs.FieldString("access_token", "A static OAUTHBEARER access token"),
docs.FieldString("access_token", "A static OAUTHBEARER access token").Secret(),
docs.FieldString("token_cache", "Instead of using a static `access_token` allows you to query a [`cache`](/docs/components/caches/about) resource to fetch OAUTHBEARER tokens from"),
docs.FieldString("token_key", "Required when using a `token_cache`, the key to query the cache with for tokens."),
).Advanced()
Expand Down
Loading