Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka-python to 2.0.2 to add support for python 3.8 #16

Merged
merged 6 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 2.0.0

* Update to kafka-python 2.0.2 to support python 3.8. #16 by @murthysrd
This is a major version bump as topics will no longer be auto-created
by the sensors or by the produce action even if the Kafka server is
configured to auto-create the topics and partitions.

## 1.0.0

* Drop Python 2.7 support
Expand Down
39 changes: 25 additions & 14 deletions actions/produce.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""
Kafka Producer Action for stackstorm
"""
from st2common.runners.base_action import Action
from kafka import SimpleProducer, KafkaClient
from kafka.util import kafka_bytestring
from kafka import KafkaProducer, KafkaConsumer


class ProduceMessageAction(Action):
"""
Action to send messages to Apache Kafka system.
"""
DEFAULT_CLIENT_ID = 'st2-kafka-producer'

DEFAULT_CLIENT_ID = "st2-kafka-producer"

def run(self, topic, message, hosts=None):
"""
Expand All @@ -28,18 +31,26 @@ def run(self, topic, message, hosts=None):

if hosts:
_hosts = hosts
elif self.config.get('hosts', None):
_hosts = self.config['hosts']
elif self.config.get("hosts", None):
_hosts = self.config["hosts"]
else:
raise ValueError("Need to define 'hosts' in either action or in config")

# set default for empty value
_client_id = self.config.get('client_id') or self.DEFAULT_CLIENT_ID

client = KafkaClient(_hosts, client_id=_client_id)
client.ensure_topic_exists(topic)
producer = SimpleProducer(client)
result = producer.send_messages(topic, kafka_bytestring(message))

if result[0]:
return result[0]._asdict()
_client_id = self.config.get("client_id") or self.DEFAULT_CLIENT_ID

consumer = KafkaConsumer(
bootstrap_servers=_hosts.split(","), client_id=_client_id
)
if topic not in consumer.topics():
raise Exception(f"Topic does not exist: {topic}")

producer = KafkaProducer(
bootstrap_servers=_hosts.split(","),
client_id=_client_id,
value_serializer=lambda m: m.encode("utf-8"),
max_request_size=10485760,
)
future = producer.send(topic, message)
record_metadata = future.get(timeout=10) # TODO: Make this timeout an input param
return record_metadata._asdict()
2 changes: 1 addition & 1 deletion pack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords:
- base64
- stackdriver
- google cloud
version: 1.0.0
version: 2.0.0
author: StackStorm
email: [email protected]
Contributors:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kafka-python>=0.9.4,<1.0
kafka-python==2.0.2
14 changes: 9 additions & 5 deletions sensors/gcp_message_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ def _ensure_topics_existence(self):
"""
Ensure that topics we're listening to exist.

Fetching metadata for a non-existent topic will automatically try to create it
with the default replication factor and number of partitions (default server config).
Otherwise Kafka server is not configured to auto-create topics and partitions.
This does not fetch metadata for specific topics, so it will not trigger auto-creation
of topics and partitions even if the Kafka server has the default server config that
allows such auto-creation.
"""
map(self._consumer._client.ensure_topic_exists, self._topics)
self._consumer.set_topic_partitions(*self._topics)
cluster_topics = self._consumer.topics()
missing_topics = [topic for topic in self._topics if topic not in cluster_topics]
if missing_topics:
raise Exception(f"One or more topics do not exist: {', '.join(missing_topics)}")
# topic partitions were already subscribed on KafkaConsumer init
# self._consumer.subscribe(topics=self._topics)

def run(self):
"""
Expand Down
14 changes: 9 additions & 5 deletions sensors/message_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,16 @@ def _ensure_topics_existence(self):
"""
Ensure that topics we're listening to exist.

Fetching metadata for a non-existent topic will automatically try to create it
with the default replication factor and number of partitions (default server config).
Otherwise Kafka server is not configured to auto-create topics and partitions.
This does not fetch metadata for specific topics, so it will not trigger auto-creation
of topics and partitions even if the Kafka server has the default server config that
allows such auto-creation.
"""
map(self._consumer._client.ensure_topic_exists, self._topics)
self._consumer.set_topic_partitions(*self._topics)
cluster_topics = self._consumer.topics()
missing_topics = [topic for topic in self._topics if topic not in cluster_topics]
if missing_topics:
raise Exception(f"One or more topics do not exist: {', '.join(missing_topics)}")
# topic partitions were already subscribed on KafkaConsumer init
# self._consumer.subscribe(topics=self._topics)

def run(self):
"""
Expand Down
Loading