From 02c91a456a3f8fd9767977288fa3b81f2f0c8598 Mon Sep 17 00:00:00 2001 From: Sekhar Sankaramanchi Date: Sat, 11 Feb 2023 13:44:44 -0700 Subject: [PATCH] fix kafka-python actions for python 3.8 --- CHANGES.md | 2 ++ actions/produce.py | 41 +++++++++++++++++++++++++++-------------- requirements.txt | 2 +- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0eb2d72..f211152 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,7 @@ # Changelog +* Fix kafka-python actions for python 3.8. Fixes #15 + ## 1.0.0 * Drop Python 2.7 support diff --git a/actions/produce.py b/actions/produce.py index ad45397..cd87834 100644 --- a/actions/produce.py +++ b/actions/produce.py @@ -1,13 +1,18 @@ +""" +Kafka Producer Action for stackstorm +""" +import json + from st2common.runners.base_action import Action -from kafka import SimpleProducer, KafkaClient -from kafka.util import kafka_bytestring +from kafka import KafkaProducer, KafkaConsumer class ProduceMessageAction(Action): """ Action to send messages to Apache Kafka system. """ - DEFAULT_CLIENT_ID = 'st2-kafka-producer' + + DEFAULT_CLIENT_ID = "st2-kafka-producer" def run(self, topic, message, hosts=None): """ @@ -28,18 +33,26 @@ def run(self, topic, message, hosts=None): if hosts: _hosts = hosts - elif self.config.get('hosts', None): - _hosts = self.config['hosts'] + elif self.config.get("hosts", None): + _hosts = self.config["hosts"] else: raise ValueError("Need to define 'hosts' in either action or in config") # set default for empty value - _client_id = self.config.get('client_id') or self.DEFAULT_CLIENT_ID - - client = KafkaClient(_hosts, client_id=_client_id) - client.ensure_topic_exists(topic) - producer = SimpleProducer(client) - result = producer.send_messages(topic, kafka_bytestring(message)) - - if result[0]: - return result[0]._asdict() + _client_id = self.config.get("client_id") or self.DEFAULT_CLIENT_ID + + consumer = KafkaConsumer( + bootstrap_servers=_hosts.split(","), client_id=_client_id + ) + try: + assert topic in consumer.topics() + except AssertionError as exc: + raise AssertionError(f"{topic} does not exist.") from exc + + producer = KafkaProducer( + bootstrap_servers=_hosts.split(","), + client_id=_client_id, + ) + result = producer.send(topic, message.encode("utf-8")) + record_metadata = result.get(timeout=10) + return record_metadata diff --git a/requirements.txt b/requirements.txt index e2dbc6e..7aedb58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -kafka-python>=0.9.4,<1.0 +kafka-python==2.0.2