Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lecture-03 Updated kafka brokerlist for flume configmap #49

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infrastructure/images.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ confluentinc/cp-ksqldb-server:7.3.1
confluentinc/cp-ksqldb-cli:7.3.1
redpandadata/console:v2.7.1
dvoros/sqoop:latest
bde2020/flume
bde2020/flume:latest
2 changes: 1 addition & 1 deletion lectures/01/exercises.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Now that you have deployed the application the next step is to try and connect t
Connecting to a service can be done in different ways. The simplest way to connect to a service inside the cluster and to your localhost is by running the `kubectl port-forward` command.

#### Use `kubectl port-forward`
To use the `kubectl port-forward` command, run the following command `kubectl port-forward svc/<name of service> <localhost port>:<service port>`. The command will open a port on your localhost and forward traffic to a port on the service. In this case, use port `8080` for both your localhost and service (`kubectl port-forward svc/hello-kubernetes 8080:8080`).
To use the `kubectl port-forward` command, run the following command `kubectl port-forward svc/<name of service> <localhost port>:<service port>`. The command will open a port on your localhost and forward traffic to a port on the service. In this case, use port `8080` for both your localhost and service (`kubectl port-forward svc/hello-kubernetes 8080`).
Please feel free to change it to something different. Example to `9000:8080` to use port 9000 on your localhost and forward the traffic to port 8080 on the service. Open a browser and navigate to [http://localhost:8080](http://localhost:8080) to access your deployed application.


Expand Down
10 changes: 3 additions & 7 deletions lectures/03/exercises.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ We will use the [Redpanda Console](https://redpanda.com/redpanda-console-kafka-u
kubectl apply -f redpanda.yaml
```

**Task**: Access Redpanda using the following command: `kubectl port-forward svc/redpanda 8080:8080` to open [Redpanda](http://127.0.0.1:8080) in your browser!
**Task**: Access Redpanda using the following command: `kubectl port-forward svc/redpanda 8080` to open [Redpanda](http://127.0.0.1:8080) in your browser!

**Task**: Explore the following tabs:
- [Overview](http://127.0.0.1:8080/overview)
Expand Down Expand Up @@ -69,7 +69,7 @@ Besides the Redpanda console, we will also use Kafka Connect, Kafka Schema Regis

The list below summarises the extra services and briefly demonstrate how to interact with them:
- Registry (kafka-schema-registry)
- `kubectl port-forward svc/kafka-schema-registry 8081:8081`. Make a `curl` cmd in a terminal using the URL [http://127.0.0.1:8081](http://127.0.0.1:8081) and get this output:
- `kubectl port-forward svc/kafka-schema-registry 8081`. Make a `curl` cmd in a terminal using the URL [http://127.0.0.1:8081](http://127.0.0.1:8081) and get this output:
```
curl http://127.0.0.1:8081
{}%
Expand Down Expand Up @@ -121,12 +121,8 @@ kubectl exec --tty -i kafka-client -- bash
```
3. Run the following commands in the first terminal to produce messages to the Kafka topic `test`:
```bash
# Define the namespace variable
NAMESPACE="<your_namespace>"

# Use the variable in your Kafka producer command
kafka-console-producer.sh \
--broker-list kafka-controller-0.kafka-controller-headless.${NAMESPACE}.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.${NAMESPACE}.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.${NAMESPACE}.svc.cluster.local:9092 \
--broker-list kafka-controller-0.kafka-controller-headless:9092,kafka-controller-1.kafka-controller-headless:9092,kafka-controller-2.kafka-controller-headless:9092 \
--topic test
```

Expand Down
2 changes: 1 addition & 1 deletion lectures/03/flume.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ data:
flume-agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
flume-agent.sinks.kafkaSink.kafka.topic = flume-logs
flume-agent.sinks.kafkaSink.kafka.bootstrap.servers = kafka:9092
flume-agent.sinks.kafkaSink.kafka.brokerList = strimzi-kafka-bootstrap.kafka:9092
flume-agent.sinks.kafkaSink.kafka.brokerList = kafka-controller-0.kafka-controller-headless:9092, kafka-controller-1.kafka-controller-headless:9092, kafka-controller-2.kafka-controller-headless:9092
flume-agent.sinks.kafkaSink.flumeBatchSize = 20
flume-agent.sinks.kafkaSink.kafka.producer.acks = 1
flume-agent.sinks.kafkaSink.producer.acks = 1
Expand Down
14 changes: 7 additions & 7 deletions lectures/03/hints/client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from kafka import KafkaProducer, KafkaConsumer
import json
from data_model import generate_sample, PackageObj

KAFKA_BROKERS: str = (
"strimzi-kafka-bootstrap.kafka:9092" # <service name>.<namepsace>:<port>
)
from data_model import PackageObj, generate_sample
from kafka import KafkaConsumer, KafkaProducer

# Format <pod name>.<service name>:<port>
KAFKA_BOOTSTRAP: list[str] = ["kafka:9092"]

DEFAULT_TOPIC: str = "INGESTION"
DEFAULT_ENCODING: str = "utf-8"
DEFAULT_CONSUMER: str = "DEFAULT_CONSUMER"


def get_producer() -> KafkaProducer:
return KafkaProducer(bootstrap_servers=[KAFKA_BROKERS])
return KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)


def get_consumer(topic: str, group_id: str = None) -> KafkaConsumer:
if group_id is None:
group_id = DEFAULT_CONSUMER
return KafkaConsumer(topic, bootstrap_servers=[KAFKA_BROKERS], group_id=group_id)
return KafkaConsumer(topic, bootstrap_servers=KAFKA_BOOTSTRAP, group_id=group_id)


def send_msg(value, key: str, topic: str, producer: KafkaProducer) -> None:
Expand Down