Skip to content

Commit

Permalink
fix kafka-python actions for python 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
Sekhar Sankaramanchi committed Feb 16, 2023
1 parent e8f0708 commit 02c91a4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Changelog

* Fix kafka-python actions for python 3.8. Fixes #15

## 1.0.0

* Drop Python 2.7 support
Expand Down
41 changes: 27 additions & 14 deletions actions/produce.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
"""
Kafka Producer Action for stackstorm
"""
import json

from st2common.runners.base_action import Action
from kafka import SimpleProducer, KafkaClient
from kafka.util import kafka_bytestring
from kafka import KafkaProducer, KafkaConsumer


class ProduceMessageAction(Action):
"""
Action to send messages to Apache Kafka system.
"""
DEFAULT_CLIENT_ID = 'st2-kafka-producer'

DEFAULT_CLIENT_ID = "st2-kafka-producer"

def run(self, topic, message, hosts=None):
"""
Expand All @@ -28,18 +33,26 @@ def run(self, topic, message, hosts=None):

if hosts:
_hosts = hosts
elif self.config.get('hosts', None):
_hosts = self.config['hosts']
elif self.config.get("hosts", None):
_hosts = self.config["hosts"]
else:
raise ValueError("Need to define 'hosts' in either action or in config")

# set default for empty value
_client_id = self.config.get('client_id') or self.DEFAULT_CLIENT_ID

client = KafkaClient(_hosts, client_id=_client_id)
client.ensure_topic_exists(topic)
producer = SimpleProducer(client)
result = producer.send_messages(topic, kafka_bytestring(message))

if result[0]:
return result[0]._asdict()
_client_id = self.config.get("client_id") or self.DEFAULT_CLIENT_ID

consumer = KafkaConsumer(
bootstrap_servers=_hosts.split(","), client_id=_client_id
)
try:
assert topic in consumer.topics()
except AssertionError as exc:
raise AssertionError(f"{topic} does not exist.") from exc

producer = KafkaProducer(
bootstrap_servers=_hosts.split(","),
client_id=_client_id,
)
result = producer.send(topic, message.encode("utf-8"))
record_metadata = result.get(timeout=10)
return record_metadata
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kafka-python>=0.9.4,<1.0
kafka-python==2.0.2

0 comments on commit 02c91a4

Please sign in to comment.