Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka-python to 2.0.2 to add support for python 3.8 #16

Merged
merged 6 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.1.0

* Update to kafka-python 2.0.2 to support python 3.8. #16 by @murthysrd

## 1.0.0

* Drop Python 2.7 support
Expand Down
41 changes: 27 additions & 14 deletions actions/produce.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""
Kafka Producer Action for stackstorm
"""
from st2common.runners.base_action import Action
from kafka import SimpleProducer, KafkaClient
from kafka.util import kafka_bytestring
from kafka import KafkaProducer, KafkaConsumer


class ProduceMessageAction(Action):
"""
Action to send messages to Apache Kafka system.
"""
DEFAULT_CLIENT_ID = 'st2-kafka-producer'

DEFAULT_CLIENT_ID = "st2-kafka-producer"

def run(self, topic, message, hosts=None):
"""
Expand All @@ -28,18 +31,28 @@ def run(self, topic, message, hosts=None):

if hosts:
_hosts = hosts
elif self.config.get('hosts', None):
_hosts = self.config['hosts']
elif self.config.get("hosts", None):
_hosts = self.config["hosts"]
else:
raise ValueError("Need to define 'hosts' in either action or in config")

# set default for empty value
_client_id = self.config.get('client_id') or self.DEFAULT_CLIENT_ID

client = KafkaClient(_hosts, client_id=_client_id)
client.ensure_topic_exists(topic)
producer = SimpleProducer(client)
result = producer.send_messages(topic, kafka_bytestring(message))

if result[0]:
return result[0]._asdict()
_client_id = self.config.get("client_id") or self.DEFAULT_CLIENT_ID

consumer = KafkaConsumer(
bootstrap_servers=_hosts.split(","), client_id=_client_id
)
try:
assert topic in consumer.topics()
except AssertionError as exc:
raise AssertionError(f"{topic} does not exist.") from exc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a direct replacement for ensure_topic_exists() because: if the broker is configured to create topics automatically, then that will no longer happen here. Maybe this should actually just be a major version bump.

Also, apparently we have to update the sensors too.


producer = KafkaProducer(
bootstrap_servers=_hosts.split(","),
client_id=_client_id,
value_serializer=lambda m: m.encode("utf-8"),
max_request_size=10485760,
)
future = producer.send(topic, message)
record_metadata = future.get(timeout=10) # TODO: Make this timeout an input param
return record_metadata._asdict()
2 changes: 1 addition & 1 deletion pack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords:
- base64
- stackdriver
- google cloud
version: 1.0.0
version: 1.1.0
author: StackStorm
email: [email protected]
Contributors:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kafka-python>=0.9.4,<1.0
kafka-python==2.0.2
Loading