Skip to content

Commit

Permalink
feat docs: introduce Kafka tutorial
Browse files Browse the repository at this point in the history
commit_hash:04d358d3fe66734f7633845b9717ee0b0c338ea7
  • Loading branch information
fdr400 committed Oct 4, 2024
1 parent a5c38e6 commit 9998b0c
Show file tree
Hide file tree
Showing 30 changed files with 448 additions and 45 deletions.
1 change: 1 addition & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -3310,6 +3310,7 @@
"scripts/docs/en/userver/tutorial/hello_service.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/hello_service.md",
"scripts/docs/en/userver/tutorial/http_caching.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/http_caching.md",
"scripts/docs/en/userver/tutorial/json_to_yaml.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/json_to_yaml.md",
"scripts/docs/en/userver/tutorial/kafka_service.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/kafka_service.md",
"scripts/docs/en/userver/tutorial/mongo_service.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/mongo_service.md",
"scripts/docs/en/userver/tutorial/multipart_service.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/multipart_service.md",
"scripts/docs/en/userver/tutorial/postgres_service.md":"taxi/uservices/userver/scripts/docs/en/userver/tutorial/postgres_service.md",
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CMAKE_RELEASE_FLAGS += -DCMAKE_BUILD_TYPE=Release $(CMAKE_COMMON_FLAGS)
.PHONY: all
all: test-debug test-release

# Requires doxygen 1.9.8+
# Requires doxygen 1.10.0+
.PHONY: docs
docs:
@rm -rf docs/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class HandlerKafkaProducers final
const server::http::HttpRequest& request) const;

private:
std::unordered_map<std::string, kafka::Producer&> producer_by_topic_;
std::unordered_map<std::string, const kafka::Producer&> producer_by_topic_;
};

} // namespace functional_tests
Expand Down
6 changes: 5 additions & 1 deletion kafka/include/userver/kafka/consumer_component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ class Consumer;

class ConsumerComponent final : public components::ComponentBase {
public:
static constexpr std::string_view kName = "kafka-consumer";
/// @ingroup userver_component_names
/// @brief The default name of kafka::ConsumerComponent component
static constexpr std::string_view kName{"kafka-consumer"};

ConsumerComponent(const components::ComponentConfig& config,
const components::ComponentContext& context);
~ConsumerComponent() override;

/// @brief Returns consumer instance.
/// @see kafka::ConsumerScope
ConsumerScope GetConsumer();

static yaml_config::Schema GetStaticConfigSchema();
Expand Down
4 changes: 2 additions & 2 deletions kafka/include/userver/kafka/consumer_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class Consumer;
///
/// Common usage:
///
/// @snippet samples/kafka_service/main.cpp Kafka service sample - consumer usage
/// @snippet samples/kafka_service/src/consumer_handler.cpp Kafka service sample - consumer usage
///
/// ## Important implementation details
///
/// ConsumerScope holds reference to `impl::Consumer` that actually
/// ConsumerScope holds reference to kafka::impl::Consumer that actually
/// represents the Apache Kafka Balanced Consumer.
///
/// It exposes the API for asynchronous message batches processing that is
Expand Down
4 changes: 4 additions & 0 deletions kafka/include/userver/kafka/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class SendException : public std::runtime_error {
public:
using std::runtime_error::runtime_error;

/// @brief Returns whether it makes sense to retry failed send.
///
/// @see
/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md8
bool IsRetryable() const noexcept;

protected:
Expand Down
6 changes: 3 additions & 3 deletions kafka/include/userver/kafka/producer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class Producer final {
///
/// @note Use SendException::IsRetryable method to understand whether there is
/// a sense to retry the message sending.
/// @snippet kafka/tests/producer_test.cpp Producer retryable error
/// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
void Send(const std::string& topic_name, std::string_view key,
std::string_view message,
std::optional<std::uint32_t> partition = std::nullopt) const;
Expand All @@ -101,14 +101,14 @@ class Producer final {
/// requests may be retried by the library (for instance, in case of network
/// blink). Though, the order messages are written to partition may differ
/// from the order messages are initially sent
/// @snippet kafka/tests/producer_test.cpp Producer batch send async
/// @snippet kafka/tests/producer_kafkatest.cpp Producer batch send async
[[nodiscard]] engine::TaskWithResult<void> SendAsync(
std::string topic_name, std::string key, std::string message,
std::optional<std::uint32_t> partition = std::nullopt) const;

/// @brief Dumps per topic messages produce statistics. No expected to be
/// called manually.
/// @see impl/stats.hpp
/// @see kafka/impl/stats.hpp
void DumpMetric(utils::statistics::Writer& writer) const;

private:
Expand Down
8 changes: 6 additions & 2 deletions kafka/include/userver/kafka/producer_component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ namespace kafka {

class ProducerComponent final : public components::ComponentBase {
public:
static constexpr std::string_view kName = "kafka-producer";
/// @ingroup userver_component_names
/// @brief The default name of kafka::ProducerComponent component
static constexpr std::string_view kName{"kafka-producer"};

ProducerComponent(const components::ComponentConfig& config,
const components::ComponentContext& context);
~ProducerComponent() override;

Producer& GetProducer();
/// @brief Returns a producer instance reference.
/// @see kafka::Producer
const Producer& GetProducer();

static yaml_config::Schema GetStaticConfigSchema();

Expand Down
2 changes: 1 addition & 1 deletion kafka/src/kafka/producer_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ProducerComponent::ProducerComponent(

ProducerComponent::~ProducerComponent() { statistics_holder_.Unregister(); }

Producer& ProducerComponent::GetProducer() { return producer_; }
const Producer& ProducerComponent::GetProducer() { return producer_; }

yaml_config::Schema ProducerComponent::GetStaticConfigSchema() {
return yaml_config::MergeSchemas<components::ComponentBase>(R"(
Expand Down
15 changes: 12 additions & 3 deletions kafka/utest/include/userver/kafka/utest/kafka_fixture.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <atomic>
#include <chrono>
#include <deque>
#include <vector>
Expand All @@ -15,6 +16,8 @@ USERVER_NAMESPACE_BEGIN

namespace kafka::utest {

/// @ingroup userver_utest
///
/// @brief Message owning data wrapper for unit tests.
struct Message {
std::string topic;
Expand All @@ -25,6 +28,8 @@ struct Message {

bool operator==(const Message& lhs, const Message& rhs);

/// @ingroup userver_utest
///
/// @brief Helper for Kafka unit testing.
///
/// KafkaCluster is useful for
Expand Down Expand Up @@ -80,16 +85,20 @@ class KafkaCluster : public ::testing::Test {
std::size_t count, std::function<std::string(std::size_t)> nameGenerator,
impl::ProducerConfiguration configuration = {});

/// @brief Creates temporary producer and send messages span.
void SendMessages(utils::span<const Message> messages);

impl::Consumer MakeConsumer(const std::string& name,
const std::vector<std::string>& topics,
impl::ConsumerConfiguration configuration = {},
impl::ConsumerExecutionParams params = {});

std::vector<Message> ReceiveMessages(impl::Consumer& consumer,
std::size_t expected_messages_count,
bool commit_after_receive = true);
/// @brief Starts consumer, waits until `expected_message_count` messages are
/// consumed, calls `user_callback` if set, stops consumer.
std::vector<Message> ReceiveMessages(
impl::Consumer& consumer, std::size_t expected_messages_count,
bool commit_after_receive = true,
std::optional<std::function<void(MessageBatchView)>> user_callback = {});

private:
impl::Secret AddBootstrapServers(impl::Secret secrets) const;
Expand Down
7 changes: 6 additions & 1 deletion kafka/utest/src/kafka/utest/kafka_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,24 @@ impl::Consumer KafkaCluster::MakeConsumer(

std::vector<Message> KafkaCluster::ReceiveMessages(
impl::Consumer& consumer, std::size_t expected_messages_count,
bool commit_after_receive) {
bool commit_after_receive,
std::optional<std::function<void(MessageBatchView)>> user_callback) {
std::vector<Message> received_messages;

engine::SingleUseEvent event;
auto consumer_scope = consumer.MakeConsumerScope();
consumer_scope.Start(
[&received_messages, expected_messages_count, &event, &consumer_scope,
&user_callback,
commit = commit_after_receive](MessageBatchView messages) {
for (const auto& message : messages) {
received_messages.push_back(Message{
message.GetTopic(), std::string{message.GetKey()},
std::string{message.GetPayload()}, message.GetPartition()});
}
if (user_callback) {
(*user_callback)(messages);
}
if (commit) {
consumer_scope.AsyncCommit();
}
Expand Down
6 changes: 6 additions & 0 deletions samples/kafka_service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ target_include_directories(${PROJECT_NAME}_objs PUBLIC src/)
add_executable(${PROJECT_NAME} "main.cpp")
target_link_libraries(${PROJECT_NAME} PRIVATE ${PROJECT_NAME}_objs)

# /// [Kafka service sample - kafka unit test cmake]
add_executable(${PROJECT_NAME}-unittest "unittest/kafka_test.cpp")
target_link_libraries(${PROJECT_NAME}-unittest
PRIVATE ${PROJECT_NAME}_objs userver::kafka-utest)
Expand All @@ -31,9 +32,14 @@ userver_add_utest(
"TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1"
)

# /// [Kafka service sample - kafka unit test cmake]

# /// [Kafka service sample - kafka functional test cmake]
userver_testsuite_add_simple(
TEST_ENV
"TESTSUITE_KAFKA_SERVER_START_TIMEOUT=120.0"
"TESTSUITE_KAFKA_SERVER_HOST=[::1]"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1"
)

# /// [Kafka service sample - kafka functional test cmake]
2 changes: 2 additions & 0 deletions samples/kafka_service/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <consumer_handler.hpp>
#include <producer_handler.hpp>

/// [Kafka service sample - main]
int main(int argc, char* argv[]) {
const auto components_list =
components::MinimalServerComponentList()
Expand All @@ -33,3 +34,4 @@ int main(int argc, char* argv[]) {

return utils::DaemonMain(argc, argv, components_list);
}
/// [Kafka service sample - main]
2 changes: 2 additions & 0 deletions samples/kafka_service/src/consume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

namespace kafka_sample {

/// [Kafka service sample - consume]
void Consume(kafka::MessageBatchView messages) {
for (const auto& message : messages) {
if (!message.GetTimestamp().has_value()) {
Expand All @@ -19,5 +20,6 @@ void Consume(kafka::MessageBatchView messages) {
}());
}
}
/// [Kafka service sample - consume]

} // namespace kafka_sample
6 changes: 6 additions & 0 deletions samples/kafka_service/src/consumer_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

#include <string_view>

/// [Kafka service sample - consumer include]

#include <userver/kafka/consumer_scope.hpp>

/// [Kafka service sample - consumer include]

#include <userver/components/component_base.hpp>

#include <userver/utest/using_namespace_userver.hpp>

namespace kafka_sample {

/// [Kafka service sample - consumer component declaration]
class ConsumerHandler final : public components::ComponentBase {
public:
static constexpr std::string_view kName{"consumer-handler"};
Expand All @@ -21,5 +26,6 @@ class ConsumerHandler final : public components::ComponentBase {
// Subscriptions must be the last fields! Add new fields above this comment.
kafka::ConsumerScope consumer_;
};
/// [Kafka service sample - consumer component declaration]

} // namespace kafka_sample
2 changes: 2 additions & 0 deletions samples/kafka_service/src/produce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace kafka_sample {

/// [Kafka service sample - produce]
SendStatus Produce(const kafka::Producer& producer,
const RequestMessage& message) {
try {
Expand All @@ -14,5 +15,6 @@ SendStatus Produce(const kafka::Producer& producer,
: SendStatus::kErrorNonRetryable;
}
}
/// [Kafka service sample - produce]

} // namespace kafka_sample
1 change: 1 addition & 0 deletions samples/kafka_service/src/produce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ enum class SendStatus {
kErrorNonRetryable,
};

/// @brief Example message data.
struct RequestMessage {
std::string topic;
std::string key;
Expand Down
4 changes: 4 additions & 0 deletions samples/kafka_service/src/producer_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ bool IsCorrectRequest(const formats::json::Value& request_json) {

} // namespace

/// [Kafka service sample - producer component find]
ProducerHandler::ProducerHandler(const components::ComponentConfig& config,
const components::ComponentContext& context)
: server::handlers::HttpHandlerJsonBase{config, context},
producer_{
context.FindComponent<kafka::ProducerComponent>().GetProducer()} {}
/// [Kafka service sample - producer component find]

/// [Kafka service sample - producer handler implementation]
formats::json::Value ProducerHandler::HandleRequestJsonThrow(
const server::http::HttpRequest& request,
const formats::json::Value& request_json,
Expand All @@ -71,5 +74,6 @@ formats::json::Value ProducerHandler::HandleRequestJsonThrow(
}
UINVARIANT(false, "Unknown produce status");
}
/// [Kafka service sample - producer handler implementation]

} // namespace kafka_sample
8 changes: 7 additions & 1 deletion samples/kafka_service/src/producer_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

#include <string_view>

/// [Kafka service sample - producer include]

#include <userver/kafka/producer.hpp>

/// [Kafka service sample - producer include]

#include <userver/server/handlers/http_handler_json_base.hpp>

#include <userver/utest/using_namespace_userver.hpp>

namespace kafka_sample {

/// [Kafka service sample - producer component declaration]
class ProducerHandler final : public server::handlers::HttpHandlerJsonBase {
public:
static constexpr std::string_view kName{"producer-handler"};
Expand All @@ -23,7 +28,8 @@ class ProducerHandler final : public server::handlers::HttpHandlerJsonBase {
server::request::RequestContext& context) const override;

private:
kafka::Producer& producer_;
const kafka::Producer& producer_;
};
/// [Kafka service sample - producer component declaration]

} // namespace kafka_sample
17 changes: 16 additions & 1 deletion samples/kafka_service/testsuite/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
import pytest

# /// [Kafka service sample - kafka testsuite include]

pytest_plugins = ['pytest_userver.plugins.kafka']

# /// [Kafka service sample - kafka testsuite include]


# /// [Kafka service sample - secdist]
@pytest.fixture(scope='session')
def service_env(kafka_secdist) -> dict:
"""kafka_secist fixture generates the secdist config"""
"""
Note: kafka_secist fixture generates the secdist config
Expected secdist format is:
"kafka_settings": {
"<kafka-component-name>": {
"brokers": "<brokers comma-separated endpoint list>",
"username": "SASL2 username (may be empty if use PLAINTEXT)",
"password": "SASL2 password (may be empty if use PLAINTEXT)"
}
}
"""

return {'SECDIST_CONFIG': kafka_secdist}
# /// [Kafka service sample - secdist]
Expand Down
Loading

0 comments on commit 9998b0c

Please sign in to comment.