diff --git a/.mapping.json b/.mapping.json index 064808c3dfed..aa8bcc766479 100644 --- a/.mapping.json +++ b/.mapping.json @@ -2073,6 +2073,7 @@ "kafka/src/kafka/impl/stats.hpp":"taxi/uservices/userver/kafka/src/kafka/impl/stats.hpp", "kafka/src/kafka/producer.cpp":"taxi/uservices/userver/kafka/src/kafka/producer.cpp", "kafka/src/kafka/producer_component.cpp":"taxi/uservices/userver/kafka/src/kafka/producer_component.cpp", + "kafka/tests/CMakeLists.txt":"taxi/uservices/userver/kafka/tests/CMakeLists.txt", "kafka/tests/configuration_test.cpp":"taxi/uservices/userver/kafka/tests/configuration_test.cpp", "kafka/tests/consumer_test.cpp":"taxi/uservices/userver/kafka/tests/consumer_test.cpp", "kafka/tests/producer_test.cpp":"taxi/uservices/userver/kafka/tests/producer_test.cpp", diff --git a/kafka/CMakeLists.txt b/kafka/CMakeLists.txt index 99d8bc5f5586..b0ce560ff8d4 100644 --- a/kafka/CMakeLists.txt +++ b/kafka/CMakeLists.txt @@ -51,4 +51,5 @@ _userver_directory_install(COMPONENT kafka FILES if (USERVER_IS_THE_ROOT_PROJECT) add_subdirectory(functional_tests) + add_subdirectory(tests) endif() diff --git a/kafka/functional_tests/integrational_tests/static_config.yaml b/kafka/functional_tests/integrational_tests/static_config.yaml index f1f117cf8ff9..3aac98db05cb 100644 --- a/kafka/functional_tests/integrational_tests/static_config.yaml +++ b/kafka/functional_tests/integrational_tests/static_config.yaml @@ -57,6 +57,7 @@ components_manager: # /// [Kafka service sample - producer static config] kafka-producer-second: + client_id: test-client delivery_timeout: 3000ms queue_buffering_max: 100ms enable_idempotence: true diff --git a/kafka/include/userver/kafka/consumer_component.hpp b/kafka/include/userver/kafka/consumer_component.hpp index 525bab3e4cc2..c28c4efd04f2 100644 --- a/kafka/include/userver/kafka/consumer_component.hpp +++ b/kafka/include/userver/kafka/consumer_component.hpp @@ -38,6 +38,7 @@ class Consumer; /// ## Static options: /// Name | Description | Default value /// ---------------------------------- | ------------------------------------------------ | --------------- +/// client_id | Client identifier. May be an arbitrary string | userver /// group_id | consumer group id (name) | -- /// topics | list of topics consumer subscribes | -- /// max_batch_size | maximum number of messages consumer waits for new message before calling a callback | 1 diff --git a/kafka/include/userver/kafka/producer_component.hpp b/kafka/include/userver/kafka/producer_component.hpp index a1e6c419e8a4..23c25a1e4d98 100644 --- a/kafka/include/userver/kafka/producer_component.hpp +++ b/kafka/include/userver/kafka/producer_component.hpp @@ -31,6 +31,7 @@ namespace kafka { /// ## Static options: /// Name | Description | Default value /// ---------------------------- | ------------------------------------------------ | --------------- +/// client_id | Client identifier. May be an arbitrary string | userver /// delivery_timeout | time a produced message waits for successful delivery | -- /// queue_buffering_max | delay to wait for messages to be transmitted to broker | -- /// enable_idempotence | whether to make producer idempotent | false diff --git a/kafka/src/kafka/consumer_component.cpp b/kafka/src/kafka/consumer_component.cpp index e0ea8427f934..f510bf6ca336 100644 --- a/kafka/src/kafka/consumer_component.cpp +++ b/kafka/src/kafka/consumer_component.cpp @@ -71,6 +71,13 @@ additionalProperties: false consumer group id. Topic partition evenly distributed between consumers with the same `group_id` + client_id: + type: string + description: | + Client identifier. + May be an arbitrary string. + Optional, but you should set this property on each instance because it enables you to more easily correlate requests on the broker with the client instance which made it, which can be helpful in debugging and troubleshooting scenarios. + defaultDescription: userver topics: type: array description: list of topics consumer subscribes diff --git a/kafka/src/kafka/impl/configuration.cpp b/kafka/src/kafka/impl/configuration.cpp index 7d6844980c30..116b8b872921 100644 --- a/kafka/src/kafka/impl/configuration.cpp +++ b/kafka/src/kafka/impl/configuration.cpp @@ -93,6 +93,7 @@ CommonConfiguration Parse(const yaml_config::YamlConfig& config, common.metadata_max_age = config["metadata_max_age"].As( common.metadata_max_age); + common.client_id = config["client_id"].As(common.client_id); return common; } @@ -244,6 +245,7 @@ void Configuration::SetCommon(const CommonConfiguration& common) { std::to_string(common.topic_metadata_refresh_interval.count())); SetOption("metadata.max.age.ms", std::to_string(common.metadata_max_age.count())); + SetOption("client.id", common.client_id); } void Configuration::SetSecurity(const SecurityConfiguration& security, diff --git a/kafka/src/kafka/impl/configuration.hpp b/kafka/src/kafka/impl/configuration.hpp index 11da730f3ebb..d8d532a5d183 100644 --- a/kafka/src/kafka/impl/configuration.hpp +++ b/kafka/src/kafka/impl/configuration.hpp @@ -23,6 +23,7 @@ struct Secret; struct CommonConfiguration final { std::chrono::milliseconds topic_metadata_refresh_interval{300000}; std::chrono::milliseconds metadata_max_age{900000}; + std::string client_id{"userver"}; }; struct SecurityConfiguration final { diff --git a/kafka/src/kafka/producer_component.cpp b/kafka/src/kafka/producer_component.cpp index 6b593dd57176..2a51c291174a 100644 --- a/kafka/src/kafka/producer_component.cpp +++ b/kafka/src/kafka/producer_component.cpp @@ -44,6 +44,13 @@ type: object description: Kafka producer component additionalProperties: false properties: + client_id: + type: string + description: | + Client identifier. + May be an arbitrary string. + Optional, but you should set this property on each instance because it enables you to more easily correlate requests on the broker with the client instance which made it, which can be helpful in debugging and troubleshooting scenarios. + defaultDescription: userver delivery_timeout: type: string description: time a produced message waits for successful delivery diff --git a/kafka/tests/CMakeLists.txt b/kafka/tests/CMakeLists.txt new file mode 100644 index 000000000000..f3d94bcbfacd --- /dev/null +++ b/kafka/tests/CMakeLists.txt @@ -0,0 +1,11 @@ +file(GLOB_RECURSE UNIT_TEST_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/*.hpp +) + +add_executable(${PROJECT_NAME}-unittest ${UNIT_TEST_SOURCES}) +target_link_libraries(${PROJECT_NAME}-unittest PRIVATE userver::kafka userver::utest) +target_include_directories(${PROJECT_NAME}-unittest PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/../src +) +# add_google_tests(${PROJECT_NAME}-unittest) diff --git a/kafka/tests/configuration_test.cpp b/kafka/tests/configuration_test.cpp index 9071dba53088..ee3d21ecd50f 100644 --- a/kafka/tests/configuration_test.cpp +++ b/kafka/tests/configuration_test.cpp @@ -26,6 +26,8 @@ UTEST_F(ConfigurationTest, Producer) { configuration.emplace(MakeProducerConfiguration("kafka-producer"))); const kafka::impl::ProducerConfiguration default_producer{}; + EXPECT_EQ(configuration->GetOption("client.id"), + default_producer.common.client_id); EXPECT_EQ( configuration->GetOption("topic.metadata.refresh.interval.ms"), std::to_string( @@ -57,6 +59,7 @@ UTEST_F(ConfigurationTest, ProducerNonDefault) { kafka::impl::ProducerConfiguration producer_configuration{}; producer_configuration.common.topic_metadata_refresh_interval = 10ms; producer_configuration.common.metadata_max_age = 30ms; + producer_configuration.common.client_id = "test-client"; producer_configuration.delivery_timeout = 37ms; producer_configuration.queue_buffering_max = 7ms; producer_configuration.enable_idempotence = true; @@ -72,6 +75,7 @@ UTEST_F(ConfigurationTest, ProducerNonDefault) { UEXPECT_NO_THROW(configuration.emplace( MakeProducerConfiguration("kafka-producer", producer_configuration))); + EXPECT_EQ(configuration->GetOption("client.id"), "test-client"); EXPECT_EQ(configuration->GetOption("topic.metadata.refresh.interval.ms"), "10"); EXPECT_EQ(configuration->GetOption("metadata.max.age.ms"), "30"); @@ -94,6 +98,8 @@ UTEST_F(ConfigurationTest, Consumer) { configuration.emplace(MakeConsumerConfiguration("kafka-consumer"))); const kafka::impl::ConsumerConfiguration default_consumer{}; + EXPECT_EQ(configuration->GetOption("client.id"), + default_consumer.common.client_id); EXPECT_EQ( configuration->GetOption("topic.metadata.refresh.interval.ms"), std::to_string( @@ -111,6 +117,7 @@ UTEST_F(ConfigurationTest, ConsumerNonDefault) { kafka::impl::ConsumerConfiguration consumer_configuration{}; consumer_configuration.common.topic_metadata_refresh_interval = 10ms; consumer_configuration.common.metadata_max_age = 30ms; + consumer_configuration.common.client_id = "test-client"; consumer_configuration.auto_offset_reset = "largest"; consumer_configuration.rd_kafka_options["socket.keepalive.enable"] = "true"; @@ -120,6 +127,7 @@ UTEST_F(ConfigurationTest, ConsumerNonDefault) { EXPECT_EQ(configuration->GetOption("topic.metadata.refresh.interval.ms"), "10"); + EXPECT_EQ(configuration->GetOption("client.id"), "test-client"); EXPECT_EQ(configuration->GetOption("metadata.max.age.ms"), "30"); EXPECT_EQ(configuration->GetOption("security.protocol"), "plaintext"); EXPECT_EQ(configuration->GetOption("group.id"), "test-group"); diff --git a/kafka/tests/test_utils.cpp b/kafka/tests/test_utils.cpp index 4c6dfe02f694..6c095070398a 100644 --- a/kafka/tests/test_utils.cpp +++ b/kafka/tests/test_utils.cpp @@ -1,6 +1,7 @@ #include "test_utils.hpp" #include +#include #include @@ -27,13 +28,9 @@ kafka::impl::Secret MakeSecrets(std::string_view bootstrap_servers) { } // namespace -std::ostream& operator<<(std::ostream& out, const Message& message) { - return out << fmt::format( - "Message{{topic: '{}', key: '{}', payload: '{}', partition: " - "'{}'}}", - message.topic, message.key, message.payload, - message.partition ? std::to_string(message.partition.value()) - : ""); +bool operator==(const Message& lhs, const Message& rhs) { + return std::tie(lhs.topic, lhs.key, lhs.payload, lhs.partition) == + std::tie(rhs.topic, rhs.key, rhs.payload, rhs.partition); } KafkaCluster::KafkaCluster() : bootstrap_servers_(FetchBrokerList()) {} diff --git a/kafka/tests/test_utils.hpp b/kafka/tests/test_utils.hpp index a4ac1632e896..0a15485137f8 100644 --- a/kafka/tests/test_utils.hpp +++ b/kafka/tests/test_utils.hpp @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -19,11 +18,9 @@ struct Message { std::string key; std::string payload; std::optional partition; - - bool operator==(const Message& other) const = default; }; -std::ostream& operator<<(std::ostream&, const Message&); +bool operator==(const Message& lhs, const Message& rhs); class KafkaCluster : public ::testing::Test { public: