Skip to content

Commit

Permalink
feat kafka: add client.id option to static configuration
Browse files Browse the repository at this point in the history
Add client.id for producer and consumer into `CommonConfiguration`.

From kafka documentation:

>  Optional, but you should set this property on each instance because it enables you to more easily correlate requests on the broker with the client instance which made it, which can be helpful in debugging and troubleshooting scenarios.

Default value set to `userver`, because librdkafka uses `rdkafka` as default id.

I added unittests, but didn't enable them. They required real kafka instance

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/

Tests: протестировано CI

Pull Request resolved: #696
commit_hash:40a43f09515e0137e9087646221842dd6b0dc369
  • Loading branch information
Greenvi4 authored and fdr400 committed Sep 18, 2024
1 parent c4d7fbb commit f3ca2bc
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 11 deletions.
1 change: 1 addition & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,7 @@
"kafka/src/kafka/impl/stats.hpp":"taxi/uservices/userver/kafka/src/kafka/impl/stats.hpp",
"kafka/src/kafka/producer.cpp":"taxi/uservices/userver/kafka/src/kafka/producer.cpp",
"kafka/src/kafka/producer_component.cpp":"taxi/uservices/userver/kafka/src/kafka/producer_component.cpp",
"kafka/tests/CMakeLists.txt":"taxi/uservices/userver/kafka/tests/CMakeLists.txt",
"kafka/tests/configuration_test.cpp":"taxi/uservices/userver/kafka/tests/configuration_test.cpp",
"kafka/tests/consumer_test.cpp":"taxi/uservices/userver/kafka/tests/consumer_test.cpp",
"kafka/tests/producer_test.cpp":"taxi/uservices/userver/kafka/tests/producer_test.cpp",
Expand Down
1 change: 1 addition & 0 deletions kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ _userver_directory_install(COMPONENT kafka FILES

if (USERVER_IS_THE_ROOT_PROJECT)
add_subdirectory(functional_tests)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ components_manager:
# /// [Kafka service sample - producer static config]

kafka-producer-second:
client_id: test-client
delivery_timeout: 3000ms
queue_buffering_max: 100ms
enable_idempotence: true
Expand Down
1 change: 1 addition & 0 deletions kafka/include/userver/kafka/consumer_component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Consumer;
/// ## Static options:
/// Name | Description | Default value
/// ---------------------------------- | ------------------------------------------------ | ---------------
/// client_id | Client identifier. May be an arbitrary string | userver
/// group_id | consumer group id (name) | --
/// topics | list of topics consumer subscribes | --
/// max_batch_size | maximum number of messages consumer waits for new message before calling a callback | 1
Expand Down
1 change: 1 addition & 0 deletions kafka/include/userver/kafka/producer_component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace kafka {
/// ## Static options:
/// Name | Description | Default value
/// ---------------------------- | ------------------------------------------------ | ---------------
/// client_id | Client identifier. May be an arbitrary string | userver
/// delivery_timeout | time a produced message waits for successful delivery | --
/// queue_buffering_max | delay to wait for messages to be transmitted to broker | --
/// enable_idempotence | whether to make producer idempotent | false
Expand Down
7 changes: 7 additions & 0 deletions kafka/src/kafka/consumer_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ additionalProperties: false
consumer group id.
Topic partition evenly distributed
between consumers with the same `group_id`
client_id:
type: string
description: |
Client identifier.
May be an arbitrary string.
Optional, but you should set this property on each instance because it enables you to more easily correlate requests on the broker with the client instance which made it, which can be helpful in debugging and troubleshooting scenarios.
defaultDescription: userver
topics:
type: array
description: list of topics consumer subscribes
Expand Down
2 changes: 2 additions & 0 deletions kafka/src/kafka/impl/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ CommonConfiguration Parse(const yaml_config::YamlConfig& config,
common.metadata_max_age =
config["metadata_max_age"].As<std::chrono::milliseconds>(
common.metadata_max_age);
common.client_id = config["client_id"].As<std::string>(common.client_id);

return common;
}
Expand Down Expand Up @@ -244,6 +245,7 @@ void Configuration::SetCommon(const CommonConfiguration& common) {
std::to_string(common.topic_metadata_refresh_interval.count()));
SetOption("metadata.max.age.ms",
std::to_string(common.metadata_max_age.count()));
SetOption("client.id", common.client_id);
}

void Configuration::SetSecurity(const SecurityConfiguration& security,
Expand Down
1 change: 1 addition & 0 deletions kafka/src/kafka/impl/configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct Secret;
struct CommonConfiguration final {
std::chrono::milliseconds topic_metadata_refresh_interval{300000};
std::chrono::milliseconds metadata_max_age{900000};
std::string client_id{"userver"};
};

struct SecurityConfiguration final {
Expand Down
7 changes: 7 additions & 0 deletions kafka/src/kafka/producer_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ type: object
description: Kafka producer component
additionalProperties: false
properties:
client_id:
type: string
description: |
Client identifier.
May be an arbitrary string.
Optional, but you should set this property on each instance because it enables you to more easily correlate requests on the broker with the client instance which made it, which can be helpful in debugging and troubleshooting scenarios.
defaultDescription: userver
delivery_timeout:
type: string
description: time a produced message waits for successful delivery
Expand Down
11 changes: 11 additions & 0 deletions kafka/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
file(GLOB_RECURSE UNIT_TEST_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/*.hpp
)

add_executable(${PROJECT_NAME}-unittest ${UNIT_TEST_SOURCES})
target_link_libraries(${PROJECT_NAME}-unittest PRIVATE userver::kafka userver::utest)
target_include_directories(${PROJECT_NAME}-unittest PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/../src
)
# add_google_tests(${PROJECT_NAME}-unittest)
8 changes: 8 additions & 0 deletions kafka/tests/configuration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ UTEST_F(ConfigurationTest, Producer) {
configuration.emplace(MakeProducerConfiguration("kafka-producer")));

const kafka::impl::ProducerConfiguration default_producer{};
EXPECT_EQ(configuration->GetOption("client.id"),
default_producer.common.client_id);
EXPECT_EQ(
configuration->GetOption("topic.metadata.refresh.interval.ms"),
std::to_string(
Expand Down Expand Up @@ -57,6 +59,7 @@ UTEST_F(ConfigurationTest, ProducerNonDefault) {
kafka::impl::ProducerConfiguration producer_configuration{};
producer_configuration.common.topic_metadata_refresh_interval = 10ms;
producer_configuration.common.metadata_max_age = 30ms;
producer_configuration.common.client_id = "test-client";
producer_configuration.delivery_timeout = 37ms;
producer_configuration.queue_buffering_max = 7ms;
producer_configuration.enable_idempotence = true;
Expand All @@ -72,6 +75,7 @@ UTEST_F(ConfigurationTest, ProducerNonDefault) {
UEXPECT_NO_THROW(configuration.emplace(
MakeProducerConfiguration("kafka-producer", producer_configuration)));

EXPECT_EQ(configuration->GetOption("client.id"), "test-client");
EXPECT_EQ(configuration->GetOption("topic.metadata.refresh.interval.ms"),
"10");
EXPECT_EQ(configuration->GetOption("metadata.max.age.ms"), "30");
Expand All @@ -94,6 +98,8 @@ UTEST_F(ConfigurationTest, Consumer) {
configuration.emplace(MakeConsumerConfiguration("kafka-consumer")));

const kafka::impl::ConsumerConfiguration default_consumer{};
EXPECT_EQ(configuration->GetOption("client.id"),
default_consumer.common.client_id);
EXPECT_EQ(
configuration->GetOption("topic.metadata.refresh.interval.ms"),
std::to_string(
Expand All @@ -111,6 +117,7 @@ UTEST_F(ConfigurationTest, ConsumerNonDefault) {
kafka::impl::ConsumerConfiguration consumer_configuration{};
consumer_configuration.common.topic_metadata_refresh_interval = 10ms;
consumer_configuration.common.metadata_max_age = 30ms;
consumer_configuration.common.client_id = "test-client";
consumer_configuration.auto_offset_reset = "largest";
consumer_configuration.rd_kafka_options["socket.keepalive.enable"] = "true";

Expand All @@ -120,6 +127,7 @@ UTEST_F(ConfigurationTest, ConsumerNonDefault) {

EXPECT_EQ(configuration->GetOption("topic.metadata.refresh.interval.ms"),
"10");
EXPECT_EQ(configuration->GetOption("client.id"), "test-client");
EXPECT_EQ(configuration->GetOption("metadata.max.age.ms"), "30");
EXPECT_EQ(configuration->GetOption("security.protocol"), "plaintext");
EXPECT_EQ(configuration->GetOption("group.id"), "test-group");
Expand Down
11 changes: 4 additions & 7 deletions kafka/tests/test_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "test_utils.hpp"

#include <algorithm>
#include <tuple>

#include <fmt/format.h>

Expand All @@ -27,13 +28,9 @@ kafka::impl::Secret MakeSecrets(std::string_view bootstrap_servers) {

} // namespace

std::ostream& operator<<(std::ostream& out, const Message& message) {
return out << fmt::format(
"Message{{topic: '{}', key: '{}', payload: '{}', partition: "
"'{}'}}",
message.topic, message.key, message.payload,
message.partition ? std::to_string(message.partition.value())
: "<no partition>");
bool operator==(const Message& lhs, const Message& rhs) {
return std::tie(lhs.topic, lhs.key, lhs.payload, lhs.partition) ==
std::tie(rhs.topic, rhs.key, rhs.payload, rhs.partition);
}

KafkaCluster::KafkaCluster() : bootstrap_servers_(FetchBrokerList()) {}
Expand Down
5 changes: 1 addition & 4 deletions kafka/tests/test_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <deque>
#include <iostream>
#include <vector>

#include <userver/kafka/producer.hpp>
Expand All @@ -19,11 +18,9 @@ struct Message {
std::string key;
std::string payload;
std::optional<std::uint32_t> partition;

bool operator==(const Message& other) const = default;
};

std::ostream& operator<<(std::ostream&, const Message&);
bool operator==(const Message& lhs, const Message& rhs);

class KafkaCluster : public ::testing::Test {
public:
Expand Down

0 comments on commit f3ca2bc

Please sign in to comment.