Skip to content

Commit

Permalink
refactor grpc: ClientData clean up
Browse files Browse the repository at this point in the history
commit_hash:68737ed9ce03b32c0f093cea5ba3e97bd7552cac
  • Loading branch information
kpavlov00 committed Jan 23, 2025
1 parent fcd858b commit 5962d49
Show file tree
Hide file tree
Showing 24 changed files with 235 additions and 161 deletions.
10 changes: 7 additions & 3 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1913,13 +1913,15 @@
"grpc/include/userver/ugrpc/client/client_factory_component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory_component.hpp",
"grpc/include/userver/ugrpc/client/client_factory_settings.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory_settings.hpp",
"grpc/include/userver/ugrpc/client/client_qos.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_qos.hpp",
"grpc/include/userver/ugrpc/client/client_settings.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_settings.hpp",
"grpc/include/userver/ugrpc/client/common_component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/common_component.hpp",
"grpc/include/userver/ugrpc/client/exceptions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/exceptions.hpp",
"grpc/include/userver/ugrpc/client/fwd.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/fwd.hpp",
"grpc/include/userver/ugrpc/client/generic.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/generic.hpp",
"grpc/include/userver/ugrpc/client/generic_client.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/generic_client.hpp",
"grpc/include/userver/ugrpc/client/impl/async_method_invocation.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/async_method_invocation.hpp",
"grpc/include/userver/ugrpc/client/impl/async_methods.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/async_methods.hpp",
"grpc/include/userver/ugrpc/client/impl/call_params.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/call_params.hpp",
"grpc/include/userver/ugrpc/client/impl/channel_factory.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/channel_factory.hpp",
"grpc/include/userver/ugrpc/client/impl/client_data.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/client_data.hpp",
"grpc/include/userver/ugrpc/client/impl/codegen_declarations.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/codegen_declarations.hpp",
"grpc/include/userver/ugrpc/client/impl/codegen_definitions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/codegen_definitions.hpp",
Expand All @@ -1943,7 +1945,7 @@
"grpc/include/userver/ugrpc/impl/protobuf_collector.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/protobuf_collector.hpp",
"grpc/include/userver/ugrpc/impl/queue_runner.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/queue_runner.hpp",
"grpc/include/userver/ugrpc/impl/span.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/span.hpp",
"grpc/include/userver/ugrpc/impl/static_metadata.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/static_metadata.hpp",
"grpc/include/userver/ugrpc/impl/static_service_metadata.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/static_service_metadata.hpp",
"grpc/include/userver/ugrpc/impl/statistics.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/statistics.hpp",
"grpc/include/userver/ugrpc/impl/statistics_scope.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/statistics_scope.hpp",
"grpc/include/userver/ugrpc/impl/statistics_storage.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/statistics_storage.hpp",
Expand Down Expand Up @@ -2001,9 +2003,11 @@
"grpc/src/ugrpc/client/channels.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/channels.cpp",
"grpc/src/ugrpc/client/client_factory.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/client_factory.cpp",
"grpc/src/ugrpc/client/client_factory_component.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/client_factory_component.cpp",
"grpc/src/ugrpc/client/client_factory_settings.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/client_factory_settings.cpp",
"grpc/src/ugrpc/client/client_settings.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/client_settings.cpp",
"grpc/src/ugrpc/client/common_component.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/common_component.cpp",
"grpc/src/ugrpc/client/exceptions.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/exceptions.cpp",
"grpc/src/ugrpc/client/generic.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/generic.cpp",
"grpc/src/ugrpc/client/generic_client.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/generic_client.cpp",
"grpc/src/ugrpc/client/impl/async_method_invocation.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/async_method_invocation.cpp",
"grpc/src/ugrpc/client/impl/async_methods.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/async_methods.cpp",
"grpc/src/ugrpc/client/impl/call_params.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/call_params.cpp",
Expand Down
40 changes: 2 additions & 38 deletions grpc/include/userver/ugrpc/client/client_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
#include <string>
#include <utility>

#include <grpcpp/completion_queue.h>
#include <grpcpp/security/credentials.h>

#include <userver/dynamic_config/source.hpp>
#include <userver/engine/task/task_processor_fwd.hpp>
#include <userver/testsuite/grpc_control.hpp>

#include <userver/ugrpc/client/client_factory_settings.hpp>
#include <userver/ugrpc/client/fwd.hpp>
#include <userver/ugrpc/client/client_settings.hpp>
#include <userver/ugrpc/client/middlewares/base.hpp>

USERVER_NAMESPACE_BEGIN
Expand All @@ -26,39 +23,6 @@ class CompletionQueuePoolBase;

namespace ugrpc::client {

/// Settings relating to creation of a code-generated client
struct ClientSettings final {
/// **(Required)**
/// The name of the client, for diagnostics, credentials and middlewares.
std::string client_name;

/// **(Required)**
/// The URI to connect to, e.g. `http://my.domain.com:8080`.
/// Should not include any HTTP path, just schema, domain name and port. Unix
/// sockets are also supported. For details, see:
/// https://grpc.github.io/grpc/cpp/md_doc_naming.html
std::string endpoint;

/// **(Optional)**
/// The name of the QOS
/// @ref scripts/docs/en/userver/dynamic_config.md "dynamic config"
/// that will be applied automatically to every RPC.
///
/// Timeout from QOS config is ignored if:
///
/// * an explicit `qos` parameter is specified at RPC creation, or
/// * deadline is specified in the `client_context` passed at RPC creation.
///
/// ## Client QOS config definition sample
///
/// @snippet grpc/tests/tests/unit_test_client_qos.hpp qos config key
const dynamic_config::Key<ClientQos>* client_qos{nullptr};

/// **(Optional)**
/// Dedicated high-load methods that have separate channels
DedicatedMethodsConfig dedicated_methods_config{};
};

/// @ingroup userver_clients
///
/// @brief Creates gRPC clients.
Expand Down Expand Up @@ -92,7 +56,7 @@ class ClientFactory final {
ugrpc::impl::CompletionQueuePoolBase& completion_queues,
ugrpc::impl::StatisticsStorage& statistics_storage,
testsuite::GrpcControl& testsuite_grpc,
dynamic_config::Source source
dynamic_config::Source config_source
);
/// @endcond

Expand Down
6 changes: 3 additions & 3 deletions grpc/include/userver/ugrpc/client/client_factory_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ USERVER_NAMESPACE_BEGIN

namespace ugrpc::client {

// method name -> count of channels
using DedicatedMethodsConfig = std::unordered_map<std::string, std::size_t>;

/// Settings relating to the ClientFactory
struct ClientFactorySettings final {
/// gRPC channel credentials, none by default
Expand All @@ -41,6 +38,9 @@ struct ClientFactorySettings final {
std::size_t channel_count{1};
};

std::shared_ptr<grpc::ChannelCredentials>
GetClientCredentials(const ClientFactorySettings& client_factory_settings, const std::string& client_name);

} // namespace ugrpc::client

USERVER_NAMESPACE_END
58 changes: 58 additions & 0 deletions grpc/include/userver/ugrpc/client/client_settings.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

/// @file userver/ugrpc/client/client_settings.hpp
/// @brief @copybrief ugrpc::client::ClientSettings

#include <cstddef>
#include <string>
#include <unordered_map>

#include <userver/dynamic_config/snapshot.hpp>

#include <userver/ugrpc/client/fwd.hpp>

USERVER_NAMESPACE_BEGIN

namespace ugrpc::client {

// rpc method name -> count of channels
using DedicatedMethodsConfig = std::unordered_map<std::string, std::size_t>;

/// Settings relating to creation of a code-generated client
struct ClientSettings final {
/// **(Required)**
/// The name of the client, for diagnostics, credentials and middlewares.
std::string client_name;

/// **(Required)**
/// The URI to connect to, e.g. `http://my.domain.com:8080`.
/// Should not include any HTTP path, just schema, domain name and port. Unix
/// sockets are also supported. For details, see:
/// https://grpc.github.io/grpc/cpp/md_doc_naming.html
std::string endpoint;

/// **(Optional)**
/// The name of the QOS
/// @ref scripts/docs/en/userver/dynamic_config.md "dynamic config"
/// that will be applied automatically to every RPC.
///
/// Timeout from QOS config is ignored if:
///
/// * an explicit `qos` parameter is specified at RPC creation, or
/// * deadline is specified in the `client_context` passed at RPC creation.
///
/// ## Client QOS config definition sample
///
/// @snippet grpc/tests/tests/unit_test_client_qos.hpp qos config key
const dynamic_config::Key<ClientQos>* client_qos{nullptr};

/// **(Optional)**
/// Dedicated high-load methods that have separate channels
DedicatedMethodsConfig dedicated_methods_config{};
};

std::size_t GetMethodChannelCount(const DedicatedMethodsConfig& dedicated_methods_config, std::string_view method_name);

} // namespace ugrpc::client

USERVER_NAMESPACE_END
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

/// @file userver/ugrpc/client/generic.hpp
/// @file userver/ugrpc/client/generic_client.hpp
/// @brief @copybrief ugrpc::client::GenericClient

#include <optional>
Expand Down
49 changes: 49 additions & 0 deletions grpc/include/userver/ugrpc/client/impl/channel_factory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>

#include <userver/engine/async.hpp>

#include <userver/ugrpc/impl/to_string.hpp>

USERVER_NAMESPACE_BEGIN

namespace ugrpc::client::impl {

class ChannelFactory final {
public:
ChannelFactory(
engine::TaskProcessor& blocking_task_processor,
const std::string& endpoint,
std::shared_ptr<grpc::ChannelCredentials> credentials,
const grpc::ChannelArguments& channel_args
)
: blocking_task_processor_(blocking_task_processor),
endpoint_{ugrpc::impl::ToGrpcString(endpoint)},
credentials_{std::move(credentials)},
channel_args_{channel_args} {}

std::shared_ptr<grpc::Channel> CreateChannel() const {
return engine::AsyncNoSpan(
blocking_task_processor_,
grpc::CreateCustomChannel,
std::ref(endpoint_),
std::ref(credentials_),
std::ref(channel_args_)
)
.Get();
}

private:
engine::TaskProcessor& blocking_task_processor_;
grpc::string endpoint_;
std::shared_ptr<grpc::ChannelCredentials> credentials_;
const grpc::ChannelArguments& channel_args_;
};

} // namespace ugrpc::client::impl

USERVER_NAMESPACE_END
69 changes: 36 additions & 33 deletions grpc/include/userver/ugrpc/client/impl/client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@
#include <optional>
#include <utility>

#include <grpcpp/channel.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/security/credentials.h>

#include <userver/dynamic_config/source.hpp>
#include <userver/engine/task/task_processor_fwd.hpp>
#include <userver/testsuite/grpc_control.hpp>
#include <userver/utils/fixed_array.hpp>

#include <userver/ugrpc/client/client_factory_settings.hpp>
#include <userver/ugrpc/client/client_settings.hpp>
#include <userver/ugrpc/client/fwd.hpp>
#include <userver/ugrpc/client/impl/channel_factory.hpp>
#include <userver/ugrpc/client/middlewares/fwd.hpp>
#include <userver/ugrpc/impl/static_metadata.hpp>
#include <userver/ugrpc/impl/static_service_metadata.hpp>
#include <userver/ugrpc/impl/statistics.hpp>
#include <userver/ugrpc/impl/to_string.hpp>

USERVER_NAMESPACE_BEGIN

Expand All @@ -28,10 +27,6 @@ class StatisticsStorage;
class CompletionQueuePoolBase;
} // namespace ugrpc::impl

namespace ugrpc::client {
struct ClientFactorySettings;
}

namespace ugrpc::client::impl {

/// Contains all non-code-generated dependencies for creating a gRPC client
Expand All @@ -44,7 +39,7 @@ struct ClientDependencies final {
const dynamic_config::Source config_source;
testsuite::GrpcControl& testsuite_grpc;
const dynamic_config::Key<ClientQos>* qos{nullptr};
const ClientFactorySettings& settings;
const ClientFactorySettings& client_factory_settings;
engine::TaskProcessor& channel_task_processor;
DedicatedMethodsConfig dedicated_methods_config;
};
Expand All @@ -66,14 +61,18 @@ class ClientData final {
: dependencies_(std::move(dependencies)),
metadata_(metadata),
service_statistics_(&GetServiceStatistics()),
channels_(CreateChannels(dependencies_)),
channel_factory_(CreateChannelFactory(dependencies_)),
channels_(CreateChannels(channel_factory_, dependencies_.client_factory_settings.channel_count)),
stubs_(MakeStubs<Service>(channels_)),
dedicated_stubs_(MakeDedicatedStubs<Service>(dependencies_, metadata)) {}
dedicated_stubs_(
MakeDedicatedStubs<Service>(channel_factory_, *metadata_, dependencies_.dedicated_methods_config)
) {}

template <typename Service>
ClientData(ClientDependencies&& dependencies, GenericClientTag, std::in_place_type_t<Service>)
: dependencies_(std::move(dependencies)),
channels_(CreateChannels(dependencies_)),
channel_factory_(CreateChannelFactory(dependencies_)),
channels_(CreateChannels(channel_factory_, dependencies_.client_factory_settings.channel_count)),
stubs_(MakeStubs<Service>(channels_)) {}

ClientData(ClientData&&) noexcept = default;
Expand Down Expand Up @@ -118,16 +117,10 @@ class ClientData final {
std::size_t GetDedicatedChannelCount(std::size_t method_id) const;

private:
static std::shared_ptr<grpc::Channel>
CreateChannelImpl(const ClientDependencies& dependencies, const grpc::string& endpoint);
static ChannelFactory CreateChannelFactory(const ClientDependencies& dependencies);

static utils::FixedArray<std::shared_ptr<grpc::Channel>> CreateChannels(const ClientDependencies& dependencies);

static std::size_t GetDedicatedChannelCountImpl(
const ClientDependencies& dependencies,
std::size_t method_id,
const ugrpc::impl::StaticServiceMetadata& meta
);
static utils::FixedArray<std::shared_ptr<grpc::Channel>>
CreateChannels(const ChannelFactory& channel_factory, std::size_t channel_count);

using StubDeleterType = void (*)(void*);
using StubPtr = std::unique_ptr<void, StubDeleterType>;
Expand All @@ -140,36 +133,46 @@ class ClientData final {
}

template <typename Service>
static utils::FixedArray<StubPtr> MakeStubs(const utils::FixedArray<std::shared_ptr<grpc::Channel>>& channels) {
static StubPtr MakeStub(const std::shared_ptr<grpc::Channel>& channel) {
return StubPtr(Service::NewStub(channel).release(), &StubDeleter<Service>);
}

template <typename Service>
static StubPool MakeStubs(const utils::FixedArray<std::shared_ptr<grpc::Channel>>& channels) {
return utils::GenerateFixedArray(channels.size(), [&](std::size_t index) {
return StubPtr(Service::NewStub(channels[index]).release(), &StubDeleter<Service>);
return MakeStub<Service>(channels[index]);
});
}

template <typename Service>
static utils::FixedArray<StubPool>
MakeDedicatedStubs(const ClientDependencies& dependencies, const ugrpc::impl::StaticServiceMetadata& meta) {
const auto& method_full_names = meta.method_full_names;
const auto endpoint_string = ugrpc::impl::ToGrpcString(dependencies.endpoint);
return utils::GenerateFixedArray(method_full_names.size(), [&](std::size_t method_id) {
const auto dedicated_channel_count = GetDedicatedChannelCountImpl(dependencies, method_id, meta);
return utils::GenerateFixedArray(dedicated_channel_count, [&](std::size_t) {
return StubPtr(Service::NewStub(CreateChannelImpl(dependencies, endpoint_string)).release(), &StubDeleter<Service>);
static utils::FixedArray<StubPool> MakeDedicatedStubs(
const ChannelFactory& channel_factory,
const ugrpc::impl::StaticServiceMetadata& metadata,
const DedicatedMethodsConfig& dedicated_methods_config
) {
return utils::GenerateFixedArray(GetMethodsCount(metadata), [&](std::size_t method_id) {
const auto method_channel_count =
GetMethodChannelCount(dedicated_methods_config, GetMethodName(metadata, method_id));
return utils::GenerateFixedArray(method_channel_count, [&](std::size_t) {
const auto channel = channel_factory.CreateChannel();
return MakeStub<Service>(channel);
});
});
}

const StubPtr& NextStubPtr(const utils::FixedArray<StubPtr>& stubs) const;
const StubPtr& NextStubPtr(const StubPool& stubs) const;

ugrpc::impl::ServiceStatistics& GetServiceStatistics();

ClientDependencies dependencies_;
std::optional<ugrpc::impl::StaticServiceMetadata> metadata_{std::nullopt};
ugrpc::impl::ServiceStatistics* service_statistics_{nullptr};

ChannelFactory channel_factory_;

utils::FixedArray<std::shared_ptr<grpc::Channel>> channels_;

utils::FixedArray<StubPtr> stubs_;
StubPool stubs_;
// method_id -> stub_pool
utils::FixedArray<StubPool> dedicated_stubs_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
#include <userver/ugrpc/client/qos.hpp>
#include <userver/ugrpc/client/response_future.hpp>
#include <userver/ugrpc/client/rpc.hpp>
#include <userver/ugrpc/impl/static_metadata.hpp>
#include <userver/ugrpc/impl/static_service_metadata.hpp>
#include <userver/ugrpc/impl/statistics.hpp>
Loading

0 comments on commit 5962d49

Please sign in to comment.