From 05f3589ffc58dab9164e787b2bfcdcc88753fb36 Mon Sep 17 00:00:00 2001 From: Aleksandr Draganov Date: Fri, 26 Apr 2024 15:52:07 +0000 Subject: [PATCH 1/4] small fix for build --- mysql/include/userver/storages/mysql/query.hpp | 4 ++-- mysql/src/storages/mysql/query.cpp | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mysql/include/userver/storages/mysql/query.hpp b/mysql/include/userver/storages/mysql/query.hpp index 16a0b55351e7..4c14f7b99829 100644 --- a/mysql/include/userver/storages/mysql/query.hpp +++ b/mysql/include/userver/storages/mysql/query.hpp @@ -19,10 +19,10 @@ class Query final { using Name = utils::StrongTypedef; /// @brief Query constructor - Query(const char* statement, std::optional name = std::nullopt); + Query(const char* statement); /// @brief Query constructor - Query(std::string statement, std::optional name = std::nullopt); + Query(std::string statement); /// @brief Get query statement const std::string& GetStatement() const; diff --git a/mysql/src/storages/mysql/query.cpp b/mysql/src/storages/mysql/query.cpp index 77ffa0db2b48..7cc4faa27da3 100644 --- a/mysql/src/storages/mysql/query.cpp +++ b/mysql/src/storages/mysql/query.cpp @@ -4,11 +4,11 @@ USERVER_NAMESPACE_BEGIN namespace storages::mysql { -Query::Query(const char* statement, std::optional name) - : statement_{statement}, name_{std::move(name)} {} +Query::Query(const char* statement) + : statement_{statement} {} -Query::Query(std::string statement, std::optional name) - : statement_{std::move(statement)}, name_{std::move(name)} {} +Query::Query(std::string statement) + : statement_{std::move(statement)} {} const std::string& Query::GetStatement() const { return statement_; } From d432b79ad783458c30bd8126477a72326cfaa5f2 Mon Sep 17 00:00:00 2001 From: Aleksandr Draganov Date: Wed, 1 May 2024 11:13:05 +0000 Subject: [PATCH 2/4] caching component for MySQL --- mysql/include/userver/cache/mysql/cache.hpp | 738 ++++++++++++++++++++ 1 file changed, 738 insertions(+) create mode 100644 mysql/include/userver/cache/mysql/cache.hpp diff --git a/mysql/include/userver/cache/mysql/cache.hpp b/mysql/include/userver/cache/mysql/cache.hpp new file mode 100644 index 000000000000..477c96e76236 --- /dev/null +++ b/mysql/include/userver/cache/mysql/cache.hpp @@ -0,0 +1,738 @@ +#pragma once + +/// @file userver/cache/mysql/cache.hpp +/// @brief @copybrief components::MySqlCache + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "userver/storages/mysql/dates.hpp" + +USERVER_NAMESPACE_BEGIN + +namespace components { + +// clang-format off + +/// @page Caching Component for MySQL +/// +/// A typical components::MySqlCache usage consists of trait definition: +/// +/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Trivial +/// +/// and registration of the component in components::ComponentList: +/// +/// @snippet cache/postgres_cache_test.cpp Pg Cache Trivial Usage +/// +/// See @ref scripts/docs/en/userver/caches.md for introduction into caches. +/// +/// +/// @section pg_cc_configuration Configuration +/// +/// components::MySqlCache static configuration file should have a PostgreSQL +/// component name specified in `pgcomponent` configuration parameter. +/// +/// Optionally the operation timeouts for cache loading can be specified. +/// +/// ### Avoiding memory leaks +/// components::CachingComponentBase +/// +/// Name | Description | Default value +/// ---- | ----------- | ------------- +/// full-update-op-timeout | timeout for a full update | 1m +/// incremental-update-op-timeout | timeout for an incremental update | 1s +/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated) +/// chunk-size | number of rows to request from PostgreSQL via portals, 0 to fetch all rows in one request without portals | 1000 +/// +/// @section pg_cc_cache_policy Cache policy +/// +/// Cache policy is the template argument of components::MySqlCache component. +/// Please see the following code snippet for documentation. +/// +/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example +/// +/// The query can be a std::string. But due to non-guaranteed order of static +/// data members initialization, std::string should be returned from a static +/// member function, please see the following code snippet. +/// +/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example +/// +/// Policy may have static function GetLastKnownUpdated. It should be used +/// when new entries from database are taken via revision, identifier, or +/// anything else, but not timestamp of the last update. +/// If this function is supplied, new entries are taken from db with condition +/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'. +/// Otherwise, condition is +/// 'WHERE kUpdatedField > last_update - correction_'. +/// See the following code snippet for an example of usage +/// +/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example +/// +/// In case one provides a custom CacheContainer within Policy, it is notified +/// of Update completion via its public member function OnWritesDone, if any. +/// See the following code snippet for an example of usage: +/// +/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example +/// +/// @section pg_cc_forward_declaration Forward Declaration +/// +/// To forward declare a cache you can forward declare a trait and +/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to +/// forward declare the cache value type. +/// +/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example +/// +/// ---------- +/// +/// @htmlonly
@endhtmlonly +/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨ +/// @htmlonly
@endhtmlonly + +// clang-format on + +namespace mysql_cache::detail { + +template +using ValueType = typename T::ValueType; +template +inline constexpr bool kHasValueType = meta::kIsDetected; + +template +using RawValueTypeImpl = typename T::RawValueType; +template +inline constexpr bool kHasRawValueType = meta::kIsDetected; +template +using RawValueType = meta::DetectedOr, RawValueTypeImpl, T>; + +template +auto ExtractValue(RawValueType&& raw) { + if constexpr (kHasRawValueType) { + return Convert(std::move(raw), + formats::parse::To>()); + } else { + return std::move(raw); + } +} + +// Component name in policy +template +using HasNameImpl = std::enable_if_t; +template +inline constexpr bool kHasName = meta::kIsDetected; + +// Component query in policy +template +using HasQueryImpl = decltype(T::kQuery); +template +inline constexpr bool kHasQuery = meta::kIsDetected; + +// Component GetQuery in policy +template +using HasGetQueryImpl = decltype(T::GetQuery()); +template +inline constexpr bool kHasGetQuery = meta::kIsDetected; + +// Component kWhere in policy +template +using HasWhere = decltype(T::kWhere); +template +inline constexpr bool kHasWhere = meta::kIsDetected; + +// Update field +template +using HasUpdatedField = decltype(T::kUpdatedField); +template +inline constexpr bool kHasUpdatedField = meta::kIsDetected; + +template +using WantIncrementalUpdates = + std::enable_if_t; +template +inline constexpr bool kWantIncrementalUpdates = + meta::kIsDetected; + +// Key member in policy +template +using KeyMemberTypeImpl = + std::decay_t>>; +template +inline constexpr bool kHasKeyMember = meta::kIsDetected; +template +using KeyMemberType = meta::DetectedType; + +// Data container for cache +template > +struct DataCacheContainer { + static_assert(meta::kIsStdHashable>, + "With default CacheContainer, key type must be std::hash-able"); + + using type = std::unordered_map, ValueType>; +}; + +template +struct DataCacheContainer< + T, USERVER_NAMESPACE::utils::void_t> { + using type = typename T::CacheContainer; +}; + +template +using DataCacheContainerType = typename DataCacheContainer::type; + +// We have to whitelist container types, for which we perform by-element +// copying, because it's not correct for certain custom containers. +template +inline constexpr bool kIsContainerCopiedByElement = + meta::kIsInstantiationOf || + meta::kIsInstantiationOf; + +template +std::unique_ptr CopyContainer( + const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations, + tracing::ScopeTime& scope) { + if constexpr (kIsContainerCopiedByElement) { + auto copy = std::make_unique(); + if constexpr (meta::kIsReservable) { + copy->reserve(container.size()); + } + + utils::CpuRelax relax{cpu_relax_iterations, &scope}; + for (const auto& kv : container) { + relax.Relax(); + copy->insert(kv); + } + return copy; + } else { + return std::make_unique(container); + } +} + +template +void CacheInsertOrAssign(Container& container, Value&& value, + const KeyMember& key_member, Args&&... /*args*/) { + // Args are only used to de-prioritize this default overload. + static_assert(sizeof...(Args) == 0); + // Copy 'key' to avoid aliasing issues in 'insert_or_assign'. + auto key = std::invoke(key_member, value); + container.insert_or_assign(std::move(key), std::forward(value)); +} + +template +using HasOnWritesDoneImpl = decltype(std::declval().OnWritesDone()); + +template +void OnWritesDone(T& container) { + if constexpr (meta::kIsDetected) { + container.OnWritesDone(); + } +} + +template +using HasCustomUpdatedImpl = + decltype(T::GetLastKnownUpdated(std::declval>())); + +template +inline constexpr bool kHasCustomUpdated = + meta::kIsDetected; + +template +using UpdatedFieldTypeImpl = typename T::UpdatedFieldType; +template +inline constexpr bool kHasUpdatedFieldType = + meta::kIsDetected; +template +using UpdatedFieldType = + meta::DetectedOr; + +template +constexpr bool CheckUpdatedFieldType() { + if constexpr (kHasUpdatedFieldType) { + static_assert( + std::is_same_v || + std::is_same_v || + kHasCustomUpdated, + "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint"); + } else { + static_assert(!kWantIncrementalUpdates, + "UpdatedFieldType must be explicitly specified when using " + "incremental updates"); + } + return true; +} + +// Cluster host type policy +template +using HasClusterHostTypeImpl = decltype(T::ClusterHostType); + +template +constexpr storages::mysql::ClusterHostType ClusterHostType() { + if constexpr (meta::kIsDetected) { + return T::ClusterHostType; + } else { + return storages::mysql::ClusterHostType::kPrimary; + } +} + +// May return null policy +template +using HasMayReturnNull = decltype(T::kMayReturnNull); + +template +constexpr bool MayReturnNull() { + if constexpr (meta::kIsDetected) { + return T::kMayReturnNull; + } else { + return false; + } +} + +template +struct PolicyChecker { + // Static assertions for cache traits + static_assert( + kHasName, + "The PosgreSQL cache policy must contain a static member `kName`"); + static_assert( + kHasValueType, + "The PosgreSQL cache policy must define a type alias `ValueType`"); + static_assert( + kHasKeyMember, + "The PostgreSQL cache policy must contain a static member `kKeyMember` " + "with a pointer to a data or a function member with the object's key"); + static_assert(kHasQuery || + kHasGetQuery, + "The PosgreSQL cache policy must contain a static data member " + "`kQuery` with a select statement or a static member function " + "`GetQuery` returning the query"); + static_assert(!(kHasQuery && + kHasGetQuery), + "The PosgreSQL cache policy must define `kQuery` or " + "`GetQuery`, not both"); + static_assert( + kHasUpdatedField, + "The PosgreSQL cache policy must contain a static member " + "`kUpdatedField`. If you don't want to use incremental updates, " + "please set its value to `nullptr`"); + static_assert(CheckUpdatedFieldType()); + + /* + static_assert(ClusterHostType() & + storages::postgres::kClusterHostRolesMask, + "Cluster host role must be specified for caching component, " + "please be more specific"); + */ + + static storages::mysql::Query GetQuery() { + if constexpr (kHasGetQuery) { + return MySqlCachePolicy::GetQuery(); + } else { + return MySqlCachePolicy::kQuery; + } + } + + using BaseType = + CachingComponentBase>; +}; + +inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1}; +inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1}; +inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0}; +inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10}; +inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2}; + +inline constexpr std::string_view kCopyStage = "copy_data"; +inline constexpr std::string_view kFetchStage = "fetch"; +inline constexpr std::string_view kParseStage = "parse"; + +inline constexpr std::size_t kDefaultChunkSize = 1000; +} // namespace mysql_cache::detail + +/// @ingroup userver_components +/// +/// @brief Caching component for PostgreSQL. See @ref pg_cache. +/// +/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md +template +class MySqlCache final + : public mysql_cache::detail::PolicyChecker::BaseType { + public: + // Type aliases + using PolicyType = MySqlCachePolicy; + using ValueType = mysql_cache::detail::ValueType; + using RawValueType = mysql_cache::detail::RawValueType; + using DataType = mysql_cache::detail::DataCacheContainerType; + using PolicyCheckerType = mysql_cache::detail::PolicyChecker; + using UpdatedFieldType = + mysql_cache::detail::UpdatedFieldType; + using BaseType = typename PolicyCheckerType::BaseType; + + // Calculated constants + constexpr static bool kIncrementalUpdates = + mysql_cache::detail::kWantIncrementalUpdates; + constexpr static auto kClusterHostTypeFlags = + mysql_cache::detail::ClusterHostType(); + constexpr static auto kName = PolicyType::kName; + + MySqlCache(const ComponentConfig&, const ComponentContext&); + ~MySqlCache() override; + + static yaml_config::Schema GetStaticConfigSchema(); + + private: + using CachedData = std::unique_ptr; + + UpdatedFieldType GetLastUpdated( + std::chrono::system_clock::time_point last_update, + const DataType& cache) const; + + void Update(cache::UpdateType type, + const std::chrono::system_clock::time_point& last_update, + const std::chrono::system_clock::time_point& now, + cache::UpdateStatisticsScope& stats_scope) override; + + bool MayReturnNull() const override; + + CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope); + void CacheResults(std::vector res, CachedData& data_cache, + cache::UpdateStatisticsScope& stats_scope, + tracing::ScopeTime& scope); + + static storages::mysql::Query GetAllQuery(); + static storages::mysql::Query GetDeltaQuery(); + + std::chrono::milliseconds ParseCorrection(const ComponentConfig& config); + + //std::vector clusters_; + std::vector> clusters_; + + const std::chrono::system_clock::duration correction_; + const std::chrono::milliseconds full_update_timeout_; + const std::chrono::milliseconds incremental_update_timeout_; + const std::size_t chunk_size_; + std::size_t cpu_relax_iterations_parse_{0}; + std::size_t cpu_relax_iterations_copy_{0}; +}; + +template +inline constexpr bool kHasValidate> = true; + +template +MySqlCache::MySqlCache(const ComponentConfig& config, + const ComponentContext& context) + : BaseType{config, context}, + correction_{ParseCorrection(config)}, + full_update_timeout_{ + config["full-update-op-timeout"].As( + mysql_cache::detail::kDefaultFullUpdateTimeout)}, + incremental_update_timeout_{ + config["incremental-update-op-timeout"].As( + mysql_cache::detail::kDefaultIncrementalUpdateTimeout)}, + chunk_size_{config["chunk-size"].As( + mysql_cache::detail::kDefaultChunkSize)} { + /* TODO + UINVARIANT( + !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(), + "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building " + "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."); + */ + if (this->GetAllowedUpdateTypes() == + cache::AllowedUpdateTypes::kFullAndIncremental && + !kIncrementalUpdates) { + throw std::logic_error( + "Incremental update support is requested in config but no update field " + "name is specified in traits of '" + + config.Name() + "' cache"); + } + if (correction_.count() < 0) { + throw std::logic_error( + "Refusing to set forward (negative) update correction requested in " + "config for '" + + config.Name() + "' cache"); + } + + const auto pg_alias = config["pgcomponent"].As(""); + /* TODO + if (pg_alias.empty()) { + throw storages::postgres::InvalidConfig{ + "No `pgcomponent` entry in configuration"}; + } + */ + auto& pg_cluster_comp = context.FindComponent(pg_alias); + const auto shard_count = 1; + clusters_.resize(shard_count); + for (size_t i = 0; i < shard_count; ++i) { + clusters_[i] = pg_cluster_comp.GetCluster(); + } + + LOG_INFO() << "Cache " << kName << " full update query `" + << GetAllQuery().GetStatement() << "` incremental update query `" + << GetDeltaQuery().GetStatement() << "`"; + + this->StartPeriodicUpdates(); +} + +template +MySqlCache::~MySqlCache() { + this->StopPeriodicUpdates(); +} + +template +storages::mysql::Query MySqlCache::GetAllQuery() { + storages::mysql::Query query = PolicyCheckerType::GetQuery(); + if constexpr (mysql_cache::detail::kHasWhere) { + return {fmt::format("{} where {}", query.GetStatement(), + MySqlCachePolicy::kWhere), + query.GetName()}; + } else { + return query; + } +} + +template +storages::mysql::Query MySqlCache::GetDeltaQuery() { + if constexpr (kIncrementalUpdates) { + storages::mysql::Query query = PolicyCheckerType::GetQuery(); + if constexpr (mysql_cache::detail::kHasWhere) { + return { + fmt::format("{} where ({}) and {} >= $1", query.GetStatement(), + MySqlCachePolicy::kWhere, PolicyType::kUpdatedField), + query.GetName()}; + } else { + return {fmt::format("{} where {} >= $1", query.GetStatement(), + PolicyType::kUpdatedField), + query.GetName()}; + } + } else { + return GetAllQuery(); + } +} + +template +std::chrono::milliseconds MySqlCache::ParseCorrection( + const ComponentConfig& config) { + static constexpr std::string_view kUpdateCorrection = "update-correction"; + if (mysql_cache::detail::kHasCustomUpdated || + this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) { + return config[kUpdateCorrection].As(0); + } else { + return config[kUpdateCorrection].As(); + } +} + +template +typename MySqlCache::UpdatedFieldType +MySqlCache::GetLastUpdated( + [[maybe_unused]] std::chrono::system_clock::time_point last_update, + const DataType& cache) const { + if constexpr (mysql_cache::detail::kHasCustomUpdated) { + return MySqlCachePolicy::GetLastKnownUpdated(cache); + } else { + return UpdatedFieldType{last_update - correction_}; + } +} + +template +void MySqlCache::Update( + cache::UpdateType type, + const std::chrono::system_clock::time_point& /*last_update*/, + const std::chrono::system_clock::time_point& /*now*/, + cache::UpdateStatisticsScope& stats_scope) { + if constexpr (!kIncrementalUpdates) { + type = cache::UpdateType::kFull; + } + const auto query = + (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery(); + /* todo + const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull) + ? full_update_timeout_ + : incremental_update_timeout_; + */ + // COPY current cached data + auto scope = tracing::Span::CurrentSpan().CreateScopeTime( + std::string{mysql_cache::detail::kCopyStage}); + auto data_cache = GetDataSnapshot(type, scope); + [[maybe_unused]] const auto old_size = data_cache->size(); + + scope.Reset(std::string{mysql_cache::detail::kFetchStage}); + + size_t changes = 0; + // Iterate clusters + // TODO + for (const std::shared_ptr& cluster : clusters_) { + if (chunk_size_ > 0) { + /*auto trx = cluster->Begin(kClusterHostTypeFlags); + auto portal = + trx.MakePortal(query, GetLastUpdated(last_update, *data_cache)); + while (portal) { + scope.Reset(std::string{mysql_cache::detail::kFetchStage}); + auto res = portal.Fetch(chunk_size_); + stats_scope.IncreaseDocumentsReadCount(res.Size()); + + scope.Reset(std::string{mysql_cache::detail::kParseStage}); + CacheResults(res, data_cache, stats_scope, scope); + changes += res.Size(); + } + trx.Commit();*/ + } else { + //bool has_parameter = query.GetStatement().find('$') != std::string::npos; + auto resultValues = cluster->Execute(userver::storages::mysql::ClusterHostType::kPrimary, query.GetStatement()).AsVector(); + stats_scope.IncreaseDocumentsReadCount(resultValues.size()); + + scope.Reset(std::string{mysql_cache::detail::kParseStage}); + CacheResults(resultValues, data_cache, stats_scope, scope); + changes += resultValues.size(); + } + } + + scope.Reset(); + + if constexpr (mysql_cache::detail::kIsContainerCopiedByElement) { + if (old_size > 0) { + const auto elapsed_copy = + scope.ElapsedTotal(std::string{mysql_cache::detail::kCopyStage}); + if (elapsed_copy > mysql_cache::detail::kCpuRelaxThreshold) { + cpu_relax_iterations_copy_ = static_cast( + static_cast(old_size) / + (elapsed_copy / mysql_cache::detail::kCpuRelaxInterval)); + LOG_TRACE() << "Elapsed time for copying " << kName << " " + << elapsed_copy.count() << " for " << changes + << " data items is over threshold. Will relax CPU every " + << cpu_relax_iterations_parse_ << " iterations"; + } + } + } + + if (changes > 0) { + const auto elapsed_parse = + scope.ElapsedTotal(std::string{mysql_cache::detail::kParseStage}); + if (elapsed_parse > mysql_cache::detail::kCpuRelaxThreshold) { + cpu_relax_iterations_parse_ = static_cast( + static_cast(changes) / + (elapsed_parse / mysql_cache::detail::kCpuRelaxInterval)); + LOG_TRACE() << "Elapsed time for parsing " << kName << " " + << elapsed_parse.count() << " for " << changes + << " data items is over threshold. Will relax CPU every " + << cpu_relax_iterations_parse_ << " iterations"; + } + } + if (changes > 0 || type == cache::UpdateType::kFull) { + // Set current cache + stats_scope.Finish(data_cache->size()); + mysql_cache::detail::OnWritesDone(*data_cache); + this->Set(std::move(data_cache)); + } else { + stats_scope.FinishNoChanges(); + } +} + +template +bool MySqlCache::MayReturnNull() const { + return mysql_cache::detail::MayReturnNull(); +} + +template +void MySqlCache::CacheResults( + std::vector res, CachedData& data_cache, + cache::UpdateStatisticsScope& stats_scope, tracing::ScopeTime& scope) { + auto values = res; + utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope}; + for (auto p = values.begin(); p != values.end(); ++p) { + relax.Relax(); + try { + using mysql_cache::detail::CacheInsertOrAssign; + CacheInsertOrAssign( + *data_cache, *p, + MySqlCachePolicy::kKeyMember); + } catch (const std::exception& e) { + stats_scope.IncreaseDocumentsParseFailures(1); + LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '" + << compiler::GetTypeName() << "': " << e.what(); + } + } +} + +template +typename MySqlCache::CachedData +MySqlCache::GetDataSnapshot(cache::UpdateType type, + tracing::ScopeTime& scope) { + if (type == cache::UpdateType::kIncremental) { + auto data = this->Get(); + if (data) { + return mysql_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, + scope); + } + } + return std::make_unique(); +} + +namespace impl { + +} // namespace impl + +template +yaml_config::Schema MySqlCache::GetStaticConfigSchema() { + using ParentType = + typename mysql_cache::detail::PolicyChecker::BaseType; + const static std::string schema = R"( +type: object +description: Caching component for MySQL derived from components::CachingComponentBase. +additionalProperties: false +properties: + full-update-op-timeout: + type: string + description: timeout for a full update + defaultDescription: 1m + incremental-update-op-timeout: + type: string + description: timeout for an incremental update + defaultDescription: 1s + update-correction: + type: string + description: incremental update window adjustment + defaultDescription: 0 for caches with defined GetLastKnownUpdated + chunk-size: + type: integer + description: number of rows to request from PostgreSQL, 0 to fetch all rows in one request + defaultDescription: 1000 + pgcomponent: + type: string + description: PostgreSQL component name + defaultDescription: "" +)"; + return yaml_config::MergeSchemas(schema); +} + +} // namespace components + +// namespace utils::impl::projected_set { +// +// template +// void CacheInsertOrAssign(Set& set, Value&& value, +// const KeyMember& /*key_member*/) { +// DoInsert(set, std::forward(value)); +// } +//} // namespace utils::impl::projected_set + +USERVER_NAMESPACE_END From d588caf260e327042bdd590bf40cc171ca2e4ebb Mon Sep 17 00:00:00 2001 From: Aleksandr Draganov Date: Mon, 13 May 2024 18:44:33 +0000 Subject: [PATCH 3/4] tests for caching component for MySQL --- mysql/include/userver/cache/mysql/cache.hpp | 26 +- mysql/src/cache/tests/mysql_cache_test.cpp | 353 ++++++++++++++++++ .../src/cache/tests/mysql_cache_test_fwd.hpp | 24 ++ 3 files changed, 389 insertions(+), 14 deletions(-) create mode 100644 mysql/src/cache/tests/mysql_cache_test.cpp create mode 100644 mysql/src/cache/tests/mysql_cache_test_fwd.hpp diff --git a/mysql/include/userver/cache/mysql/cache.hpp b/mysql/include/userver/cache/mysql/cache.hpp index 477c96e76236..daf6abbe08bb 100644 --- a/mysql/include/userver/cache/mysql/cache.hpp +++ b/mysql/include/userver/cache/mysql/cache.hpp @@ -503,8 +503,7 @@ storages::mysql::Query MySqlCache::GetAllQuery() { storages::mysql::Query query = PolicyCheckerType::GetQuery(); if constexpr (mysql_cache::detail::kHasWhere) { return {fmt::format("{} where {}", query.GetStatement(), - MySqlCachePolicy::kWhere), - query.GetName()}; + MySqlCachePolicy::kWhere)}; } else { return query; } @@ -517,12 +516,10 @@ storages::mysql::Query MySqlCache::GetDeltaQuery() { if constexpr (mysql_cache::detail::kHasWhere) { return { fmt::format("{} where ({}) and {} >= $1", query.GetStatement(), - MySqlCachePolicy::kWhere, PolicyType::kUpdatedField), - query.GetName()}; + MySqlCachePolicy::kWhere, PolicyType::kUpdatedField)}; } else { return {fmt::format("{} where {} >= $1", query.GetStatement(), - PolicyType::kUpdatedField), - query.GetName()}; + PolicyType::kUpdatedField)}; } } else { return GetAllQuery(); @@ -726,13 +723,14 @@ additionalProperties: false } // namespace components -// namespace utils::impl::projected_set { -// -// template -// void CacheInsertOrAssign(Set& set, Value&& value, -// const KeyMember& /*key_member*/) { -// DoInsert(set, std::forward(value)); -// } -//} // namespace utils::impl::projected_set +namespace utils::impl::projected_set { + +template +void CacheInsertOrAssign(Set& set, Value&& value, + const KeyMember& /*key_member*/) { + DoInsert(set, std::forward(value)); +} + +} // namespace utils::impl::projected_set USERVER_NAMESPACE_END diff --git a/mysql/src/cache/tests/mysql_cache_test.cpp b/mysql/src/cache/tests/mysql_cache_test.cpp new file mode 100644 index 000000000000..846f65247800 --- /dev/null +++ b/mysql/src/cache/tests/mysql_cache_test.cpp @@ -0,0 +1,353 @@ +#include "mysql_cache_test_fwd.hpp" + +#include + +#include +#include + +USERVER_NAMESPACE_BEGIN + +// This is a snippet for documentation +/*! [Pg Cache Policy Example] */ +namespace example { + +struct MyStructure { + int id = 0; + std::string bar{}; + std::chrono::system_clock::time_point updated; + + int get_id() const { return id; } +}; + +struct MySqlExamplePolicy { + // Name of caching policy component. + // + // Required: **yes** + static constexpr std::string_view kName = "my-pg-cache"; + + // Object type. + // + // Required: **yes** + using ValueType = MyStructure; + + // Key by which the object must be identified in cache. + // + // One of: + // - A pointer-to-member in the object + // - A pointer-to-member-function in the object that returns the key + // - A pointer-to-function that takes the object and returns the key + // - A lambda that takes the object and returns the key + // + // Required: **yes** + static constexpr auto kKeyMember = &MyStructure::id; + + // Data retrieve query. + // + // The query should not contain any clauses after the `from` clause. Either + // `kQuery` or `GetQuery` static member function must be defined. + // + // Required: **yes** + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + + // Name of the field containing timestamp of an object. + // + // To turn off incremental updates, set the value to `nullptr`. + // + // Required: **yes** + static constexpr const char* kUpdatedField = "updated"; + + // Type of the field containing timestamp of an object. + // + // Specifies whether updated field should be treated as a timestamp + // with or without timezone in database queries. + // + // Required: **yes** if incremental updates are used. + using UpdatedFieldType = storages::mysql::DateTime; + + // Where clause of the query. Either `kWhere` or `GetWhere` can be defined. + // + // Required: no + static constexpr const char* kWhere = "id > 10"; + + // Cache container type. + // + // It can be of any map type. The default is `unordered_map`, it is not + // necessary to declare the DataType alias if you are OK with + // `unordered_map`. + // The key type must match the type of kKeyMember. + // + // Required: no + using CacheContainer = std::unordered_map; + + // Cluster host selection flags to use when retrieving data. + // + // Default value is storages::MySql::ClusterHostType::kSlave, at least one + // cluster role must be present in flags. + // + // Required: no + static constexpr auto kClusterHostType = + storages::mysql::ClusterHostType::kPrimary; + + // Whether Get() is expected to return nullptr. + // + // Default value is false, Get() will throw an exception instead of + // returning nullptr. + // + // Required: no + static constexpr bool kMayReturnNull = false; +}; + +} // namespace example +/*! [Pg Cache Policy Example] */ + +namespace components::example { + +using USERVER_NAMESPACE::example::MyStructure; +using USERVER_NAMESPACE::example::MySqlExamplePolicy; + +struct MySqlExamplePolicy2 { + using ValueType = MyStructure; + static constexpr std::string_view kName = "my-pg-cache"; + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + static constexpr const char* kUpdatedField = ""; // Intentionally left blank + static constexpr auto kKeyMember = &MyStructure::get_id; + static constexpr auto kClusterHostType = + storages::mysql::ClusterHostType::kPrimary; +}; + +static_assert(mysql_cache::detail::kHasName); +static_assert(mysql_cache::detail::kHasName); +static_assert(mysql_cache::detail::kHasName); + +static_assert((std::is_same< + mysql_cache::detail::KeyMemberType, int>{})); +static_assert( + (std::is_same, + int>{})); + +static_assert(mysql_cache::detail::ClusterHostType() == + storages::mysql::ClusterHostType::kPrimary); +static_assert(mysql_cache::detail::ClusterHostType() == + storages::mysql::ClusterHostType::kPrimary); + +// Example of custom updated in cache +/*! [Pg Cache Policy Custom Updated Example] */ +struct MyStructureWithRevision { + int id = 0; + std::string bar{}; + std::chrono::system_clock::time_point updated; + int32_t revision = 0; + + int get_id() const { return id; } +}; + +class UserSpecificCache { + public: + void insert_or_assign(int, MyStructureWithRevision&& item) { + latest_revision_ = std::max(latest_revision_, item.revision); + } + // todo ivan check + void insert_or_assign(int, MyStructureWithRevision& item) { + latest_revision_ = std::max(latest_revision_, item.revision); + } + static size_t size() { return 0; } + + int GetLatestRevision() const { return latest_revision_; } + + private: + int latest_revision_ = 0; +}; + +struct MySqlExamplePolicy3 { + using ValueType = MyStructureWithRevision; + static constexpr std::string_view kName = "my-pg-cache"; + static constexpr const char* kQuery = + "select id, bar, revision from test.my_data"; + using CacheContainer = UserSpecificCache; + static constexpr const char* kUpdatedField = "revision"; + using UpdatedFieldType = int32_t; + static constexpr auto kKeyMember = &MyStructureWithRevision::get_id; + + // Function to get last known revision/time + // + // Optional + // If one wants to get cache updates not based on updated time, but, for + // example, based on revision > known_revision, this method should be used. + static int32_t GetLastKnownUpdated(const UserSpecificCache& container) { + return container.GetLatestRevision(); + } +}; +/*! [Pg Cache Policy Custom Updated Example] */ + +static_assert(mysql_cache::detail::kHasCustomUpdated); + +/*! [Pg Cache Policy GetQuery Example] */ +struct MySqlExamplePolicy4 { + static constexpr std::string_view kName = "my-pg-cache"; + + using ValueType = MyStructure; + + static constexpr auto kKeyMember = &MyStructure::id; + + static std::string GetQuery() { + return "select id, bar, updated from test.my_data"; + } + + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = + storages::mysql::DateTime; // no time zone (should be avoided) +}; +/*! [Pg Cache Policy GetQuery Example] */ + +static_assert(mysql_cache::detail::kHasGetQuery); + +/*! [Pg Cache Policy Trivial] */ +struct MySqlTrivialPolicy { + static constexpr std::string_view kName = "my-pg-cache"; + + using ValueType = MyStructure; + + static constexpr auto kKeyMember = &MyStructure::id; + + static constexpr const char* kQuery = "SELECT a, b, updated FROM test.data"; + + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::mysql::DateTime; +}; +/*! [Pg Cache Policy Trivial] */ + +/*! [Pg Cache Policy Compound Primary Key Example] */ +struct MyStructureCompoundKey { + int id; + std::string bar; + + bool operator==(const MyStructureCompoundKey& other) const { + return id == other.id && bar == other.bar; + } +}; + +// Alternatively, specialize std::hash +struct MyStructureCompoundKeyHash { + size_t operator()(const MyStructureCompoundKey& key) const { + size_t seed = 0; + boost::hash_combine(seed, key.id); + boost::hash_combine(seed, key.bar); + return seed; + } +}; + +struct MySqlExamplePolicy5 { + static constexpr std::string_view kName = "my-pg-cache"; + + using ValueType = MyStructure; + + // maybe_unused is required due to a Clang bug + [[maybe_unused]] static constexpr auto kKeyMember = + [](const MyStructure& my_structure) { + return MyStructureCompoundKey{my_structure.id, my_structure.bar}; + }; + + static std::string GetQuery() { + return "select id, bar, updated from test.my_data"; + } + + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::mysql::DateTime; + + using CacheContainer = std::unordered_map; +}; +/*! [Pg Cache Policy Compound Primary Key Example] */ + +static_assert(mysql_cache::detail::kHasGetQuery); + +/*! [Pg Cache Policy Custom Container With Write Notification Example] */ +class UserSpecificCacheWithWriteNotification { + public: + void insert_or_assign(int, MyStructure&&) {} + void insert_or_assign(int, MyStructure&) {} + static size_t size() { return 0; } + + void OnWritesDone() {} +}; +/*! [Pg Cache Policy Custom Container With Write Notification Example] */ + +// Tests a container with OnWritesDone +struct MySqlExamplePolicy6 { + static constexpr std::string_view kName = "my-pg-cache"; + using ValueType = MyStructure; + static constexpr auto kKeyMember = &MyStructure::id; + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::mysql::DateTime; + using CacheContainer = UserSpecificCacheWithWriteNotification; +}; + +// Tests ProjectedUnorderedSet as container +struct MySqlExamplePolicy7 { + static constexpr std::string_view kName = "my-pg-cache"; + using ValueType = MyStructure; + static constexpr auto kKeyMember = &MyStructure::id; + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::mysql::DateTime; + using CacheContainer = utils::ProjectedUnorderedSet; +}; + +// Instantiation test + +using MyCache1 = MySqlCache; +using MyCache2 = MySqlCache; +using MyCache3 = MySqlCache; +using MyCache4 = MySqlCache; +using MyTrivialCache = MySqlCache; +using MyCache5 = MySqlCache; +using MyCache6 = MySqlCache; +using MyCache7 = MySqlCache; + +// NB: field access required for actual instantiation +static_assert(MyCache1::kIncrementalUpdates); +static_assert(!MyCache2::kIncrementalUpdates); +static_assert(MyCache3::kIncrementalUpdates); +static_assert(MyCache4::kIncrementalUpdates); +static_assert(MyCache5::kIncrementalUpdates); +static_assert(MyCache6::kIncrementalUpdates); +static_assert(MyCache7::kIncrementalUpdates); + +namespace mysql = storages::mysql; +static_assert(MyCache1::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); +static_assert(MyCache2::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); +static_assert(MyCache3::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); +static_assert(MyCache4::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); +static_assert(MyCache5::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); +static_assert(MyCache6::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); +static_assert(MyCache7::kClusterHostTypeFlags == mysql::ClusterHostType::kPrimary); + +// Update() instantiation test +[[maybe_unused]] void VerifyUpdateCompiles( + const components::ComponentConfig& config, + const components::ComponentContext& context) { + MyCache1 cache1{config, context}; + MyCache2 cache2{config, context}; + MyCache3 cache3{config, context}; + MyCache4 cache4{config, context}; + MyTrivialCache my_trivial_cache{config, context}; + MyCache5 cache5{config, context}; + MyCache6 cache6{config, context}; + MyCache7 cache7{config, context}; +} + +inline auto SampleOfComponentRegistration() { + /*! [Pg Cache Trivial Usage] */ + return components::MinimalServerComponentList() + .Append>(); + /*! [Pg Cache Trivial Usage] */ +} + +} // namespace components::example + +USERVER_NAMESPACE_END diff --git a/mysql/src/cache/tests/mysql_cache_test_fwd.hpp b/mysql/src/cache/tests/mysql_cache_test_fwd.hpp new file mode 100644 index 000000000000..fa7ceb82105f --- /dev/null +++ b/mysql/src/cache/tests/mysql_cache_test_fwd.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include // for std::shared_ptr + +#include + +USERVER_NAMESPACE_BEGIN + +namespace example { // replace with a namespace of your trait + +struct MySqlExamplePolicy; +struct MyStructure; + +} // namespace example + +namespace caches { + +using MyCache1 = components::MySqlCache; +using MyCache1Data = std::shared_ptr; + +} // namespace caches +/*! [Pg Cache Fwd Example] */ + +USERVER_NAMESPACE_END \ No newline at end of file From 2d856458d081b35332d82e8b1b1284c6f16f6a37 Mon Sep 17 00:00:00 2001 From: Aleksandr Draganov Date: Mon, 13 May 2024 18:45:35 +0000 Subject: [PATCH 4/4] fix redefine --- .../userver/cache/base_postgres_cache.hpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/postgresql/include/userver/cache/base_postgres_cache.hpp b/postgresql/include/userver/cache/base_postgres_cache.hpp index bceb8d85dae3..970cae8bb168 100644 --- a/postgresql/include/userver/cache/base_postgres_cache.hpp +++ b/postgresql/include/userver/cache/base_postgres_cache.hpp @@ -711,14 +711,14 @@ yaml_config::Schema PostgreCache::GetStaticConfigSchema() { } // namespace components -namespace utils::impl::projected_set { - -template -void CacheInsertOrAssign(Set& set, Value&& value, - const KeyMember& /*key_member*/) { - DoInsert(set, std::forward(value)); -} - -} // namespace utils::impl::projected_set +//namespace utils::impl::projected_set { +// +//template +//void CacheInsertOrAssign(Set& set, Value&& value, +// const KeyMember& /*key_member*/) { +// DoInsert(set, std::forward(value)); +//} +// +//} // namespace utils::impl::projected_set USERVER_NAMESPACE_END