Skip to content

Commit

Permalink
lecture-03 Updated kafka brokerlist for flume configmap (#49)
Browse files Browse the repository at this point in the history
* lecture-03 Updated kafka brokerlist for flume configmap

* lecture-03 Updated docs and client.py to reflect the correct kafka brokers

* lecture-03 Update flume docker image version

* 8080:8080 -> 8080 and 8081:8081 -> 8081

* KAFKA_BROKERS -> KAFKA_BOOTSTRAP

---------

Co-authored-by: Anders Launer Baek-Petersen <[email protected]>
  • Loading branch information
Svane20 and anderslaunerbaek authored Aug 27, 2024
1 parent 6d43690 commit b9b996c
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 17 deletions.
2 changes: 1 addition & 1 deletion infrastructure/images.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ confluentinc/cp-ksqldb-server:7.3.1
confluentinc/cp-ksqldb-cli:7.3.1
redpandadata/console:v2.7.1
dvoros/sqoop:latest
bde2020/flume
bde2020/flume:latest
2 changes: 1 addition & 1 deletion lectures/01/exercises.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Now that you have deployed the application the next step is to try and connect t
Connecting to a service can be done in different ways. The simplest way to connect to a service inside the cluster and to your localhost is by running the `kubectl port-forward` command.

#### Use `kubectl port-forward`
To use the `kubectl port-forward` command, run the following command `kubectl port-forward svc/<name of service> <localhost port>:<service port>`. The command will open a port on your localhost and forward traffic to a port on the service. In this case, use port `8080` for both your localhost and service (`kubectl port-forward svc/hello-kubernetes 8080:8080`).
To use the `kubectl port-forward` command, run the following command `kubectl port-forward svc/<name of service> <localhost port>:<service port>`. The command will open a port on your localhost and forward traffic to a port on the service. In this case, use port `8080` for both your localhost and service (`kubectl port-forward svc/hello-kubernetes 8080`).
Please feel free to change it to something different. Example to `9000:8080` to use port 9000 on your localhost and forward the traffic to port 8080 on the service. Open a browser and navigate to [http://localhost:8080](http://localhost:8080) to access your deployed application.


Expand Down
10 changes: 3 additions & 7 deletions lectures/03/exercises.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ We will use the [Redpanda Console](https://redpanda.com/redpanda-console-kafka-u
kubectl apply -f redpanda.yaml
```

**Task**: Access Redpanda using the following command: `kubectl port-forward svc/redpanda 8080:8080` to open [Redpanda](http://127.0.0.1:8080) in your browser!
**Task**: Access Redpanda using the following command: `kubectl port-forward svc/redpanda 8080` to open [Redpanda](http://127.0.0.1:8080) in your browser!

**Task**: Explore the following tabs:
- [Overview](http://127.0.0.1:8080/overview)
Expand Down Expand Up @@ -69,7 +69,7 @@ Besides the Redpanda console, we will also use Kafka Connect, Kafka Schema Regis
The list below summarises the extra services and briefly demonstrate how to interact with them:
- Registry (kafka-schema-registry)
- `kubectl port-forward svc/kafka-schema-registry 8081:8081`. Make a `curl` cmd in a terminal using the URL [http://127.0.0.1:8081](http://127.0.0.1:8081) and get this output:
- `kubectl port-forward svc/kafka-schema-registry 8081`. Make a `curl` cmd in a terminal using the URL [http://127.0.0.1:8081](http://127.0.0.1:8081) and get this output:
```
curl http://127.0.0.1:8081
{}%
Expand Down Expand Up @@ -121,12 +121,8 @@ kubectl exec --tty -i kafka-client -- bash
```
3. Run the following commands in the first terminal to produce messages to the Kafka topic `test`:
```bash
# Define the namespace variable
NAMESPACE="<your_namespace>"
# Use the variable in your Kafka producer command
kafka-console-producer.sh \
--broker-list kafka-controller-0.kafka-controller-headless.${NAMESPACE}.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.${NAMESPACE}.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.${NAMESPACE}.svc.cluster.local:9092 \
--broker-list kafka-controller-0.kafka-controller-headless:9092,kafka-controller-1.kafka-controller-headless:9092,kafka-controller-2.kafka-controller-headless:9092 \
--topic test
```
Expand Down
2 changes: 1 addition & 1 deletion lectures/03/flume.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ data:
flume-agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
flume-agent.sinks.kafkaSink.kafka.topic = flume-logs
flume-agent.sinks.kafkaSink.kafka.bootstrap.servers = kafka:9092
flume-agent.sinks.kafkaSink.kafka.brokerList = strimzi-kafka-bootstrap.kafka:9092
flume-agent.sinks.kafkaSink.kafka.brokerList = kafka-controller-0.kafka-controller-headless:9092, kafka-controller-1.kafka-controller-headless:9092, kafka-controller-2.kafka-controller-headless:9092
flume-agent.sinks.kafkaSink.flumeBatchSize = 20
flume-agent.sinks.kafkaSink.kafka.producer.acks = 1
flume-agent.sinks.kafkaSink.producer.acks = 1
Expand Down
14 changes: 7 additions & 7 deletions lectures/03/hints/client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from kafka import KafkaProducer, KafkaConsumer
import json
from data_model import generate_sample, PackageObj

KAFKA_BROKERS: str = (
"strimzi-kafka-bootstrap.kafka:9092" # <service name>.<namepsace>:<port>
)
from data_model import PackageObj, generate_sample
from kafka import KafkaConsumer, KafkaProducer

# Format <pod name>.<service name>:<port>
KAFKA_BOOTSTRAP: list[str] = ["kafka:9092"]

DEFAULT_TOPIC: str = "INGESTION"
DEFAULT_ENCODING: str = "utf-8"
DEFAULT_CONSUMER: str = "DEFAULT_CONSUMER"


def get_producer() -> KafkaProducer:
return KafkaProducer(bootstrap_servers=[KAFKA_BROKERS])
return KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)


def get_consumer(topic: str, group_id: str = None) -> KafkaConsumer:
if group_id is None:
group_id = DEFAULT_CONSUMER
return KafkaConsumer(topic, bootstrap_servers=[KAFKA_BROKERS], group_id=group_id)
return KafkaConsumer(topic, bootstrap_servers=KAFKA_BOOTSTRAP, group_id=group_id)


def send_msg(value, key: str, topic: str, producer: KafkaProducer) -> None:
Expand Down

0 comments on commit b9b996c

Please sign in to comment.