Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

docker compose up kafka-to-http http-to-kafka #8

Draft
wants to merge 1 commit into
base: cheetah-main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions resources/docker/kafka/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,26 @@ services:
command: [ '-w', '-c', '/benthos.yaml' ]
volumes:
- ./out.yaml:/benthos.yaml
network_mode: host

kafka-to-http:
image: jeffail/benthos:local
build:
context: ../../../
dockerfile: ./resources/docker/Dockerfile
command: [ '-w', '-c', '/benthos.yaml' ]
ports:
- "4195:4195"
volumes:
- ./http.yaml:/benthos.yaml
network_mode: host

http-to-kafka:
image: jeffail/benthos:local
build:
context: ../../../
dockerfile: ./resources/docker/Dockerfile
command: [ '-w', '-c', '/benthos.yaml' ]
volumes:
- ./stream_in.yaml:/benthos.yaml
network_mode: host
48 changes: 48 additions & 0 deletions resources/docker/kafka/http.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# todo: read until <https://www.benthos.dev/docs/components/inputs/read_until> ?
http:
address: 0.0.0.0:4195
enabled: true
root_path: /kafka
debug_endpoints: false

logger:
level: DEBUG
format: logfmt
add_timestamp: true
static_fields:
"@service": benthos

input:
kafka_franz:
seed_brokers: [localhost:9092]
topics: [JobNameInputTopic]
consumer_group: benthos2
sasl:
- mechanism: OAUTHBEARER
tokenEndpoint: http://localhost:1852/realms/local-development/protocol/openid-connect/token
scope: "kafka"
clientId: "default-access"
clientSecret: "default-access-secret"

pipeline:
processors:
- bloblang: |
root = this
meta "content-type" = "application/json"

- catch:
- log:
level: ERROR
message: ${! error() }
- bloblang: root = deleted()

output:
label: "http_out"
http_server:
address: "" # global
timeout: 5s
path: /get
stream_path: /get/stream
ws_path: /get/ws
allowed_verbs:
- GET
33 changes: 33 additions & 0 deletions resources/docker/kafka/stream_in.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
http:
enabled: false

input:
label: ""
http_client:
url: "http://localhost:4195/kafka/get/stream"
verb: GET
headers: {}
rate_limit: ""
timeout: 5s
payload: ""
drop_on:
- 408 # timeout e.g no more data.
stream:
enabled: true
reconnect: true
scanner:
lines: {} # todo
auto_replay_nacks: true

output:
label: sink
kafka_franz:
seed_brokers: [localhost:9092] # No default (required)
topic: localstream_in # No default (required)
key: "" # No default (optional)
sasl:
- mechanism: OAUTHBEARER
tokenEndpoint: http://localhost:1852/realms/local-development/protocol/openid-connect/token
scope: "kafka"
clientId: "default-access"
clientSecret: "default-access-secret"
Loading