This project demonstrates the integration of Apache Kafka, Apache NiFi, and a Python producer/consumer using confluent_kafka
.
- Apache Kafka
- Apache Zookeeper
- Apache NiFi
- Python 3.x
- Download and extract Kafka.
- Start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start Kafka:
bin/kafka-server-start.sh config/server.properties
- Download and extract NiFi.
- Start NiFi:
bin/nifi.sh start
- Create a NiFi flow with
GenerateFlowFile
andPublishKafka_2_6
processors. - Configure
PublishKafka_2_6
with the following properties:- Kafka Brokers:
localhost:9092
- Topic Name:
real-time-data
- Kafka Brokers:
- Create and activate a virtual environment:
python3 -m venv kafka-env source kafka-env/bin/activate
- Install
confluent_kafka
:pip install confluent_kafka
- Create
producer.py
andconsumer.py
with the following contents:
from confluent_kafka import Producer
import json
conf = {
'bootstrap.servers': 'localhost:9092'
}
p = Producer(**conf)
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
topic = 'real-time-data'
message = {'key': 'value'}
p.produce(topic, key='key', value=json.dumps(message), callback=delivery_report)
p.poll(1)
p.flush()
#### `consumer.py`
```python
from confluent_kafka import Consumer, KafkaException, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
c = Consumer(**conf)
topic = 'real-time-data'
c.subscribe([topic])
try:
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
print(f'Received message: {msg.value().decode("utf-8")}')
except KeyboardInterrupt:
pass
finally:
c.close()
### Step 4: Run the Scripts
1. Run the producer:
```bash
python producer.py
```
2. Run the consumer:
```bash
python consumer.py
```
## Configuring `.gitignore`
A `.gitignore` file is included in the repository to ensure that certain files and directories, such as virtual environments and other environment-specific files, are not tracked by Git.
```plaintext
# .gitignore
kafka-env/
-
Create Virtual Environment:
python3 -m venv kafka-env
-
Activate Virtual Environment:
source kafka-env/bin/activate # On Windows use `kafka-env\Scripts\activate`
-
Install Requirements:
pip install -r requirements.txt
producer.py
: Python script to produce messages to the Kafka topic.consumer.py
: Python script to consume messages from the Kafka topic.requirements.txt
: Python dependencies..gitignore
: Git ignore file to exclude specific files and directories from being tracked by Git.README.md
: Project documentation.
This project is licensed under the MIT License.
This document should provide comprehensive instructions for setting up and running your project, including the necessary `.gitignore` configuration and usage of the Python scripts.