From 4bd1e1652808206a5d9516771bc71c8081405441 Mon Sep 17 00:00:00 2001 From: Murphy Date: Fri, 10 Jan 2025 18:25:02 +0800 Subject: [PATCH 1/3] adaptive enable the exchange compression with Thompson sampling Signed-off-by: Murphy --- be/src/common/config.h | 8 ++++ .../exchange/exchange_sink_operator.cpp | 35 ++++++++++---- .../exchange/exchange_sink_operator.h | 7 +-- be/src/serde/CMakeLists.txt | 1 + be/src/serde/compress_strategy.cpp | 46 +++++++++++++++++++ be/src/serde/compress_strategy.h | 46 +++++++++++++++++++ be/src/serde/encode_context.cpp | 8 ++-- be/src/util/compression/block_compression.cpp | 4 +- be/src/util/runtime_profile.h | 2 + .../common/util/CompressionUtils.java | 1 + .../com/starrocks/qe/SessionVariable.java | 4 +- gensrc/thrift/Types.thrift | 1 + 12 files changed, 145 insertions(+), 18 deletions(-) create mode 100644 be/src/serde/compress_strategy.cpp create mode 100644 be/src/serde/compress_strategy.h diff --git a/be/src/common/config.h b/be/src/common/config.h index 44532ee947994e..588d122e958e48 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -208,11 +208,19 @@ CONF_String(default_query_options, ""); // or 3x the number of cores. This keeps the cores busy without causing excessive // thrashing. CONF_Int32(num_threads_per_core, "3"); + +// Compression related parameters // If true, compresses tuple data in Serialize. CONF_Bool(compress_rowbatches, "true"); // Compress ratio when shuffle row_batches in network, not in storage engine. // If ratio is less than this value, use uncompressed data instead. CONF_mDouble(rpc_compress_ratio_threshold, "1.1"); +// Acceleration of LZ4 Compression, the larger the acceleration value, the faster the algorithm, but also the lesser the compression. +// Default 1, MIN=1, MAX=65537 +CONF_mInt32(lz4_acceleration, "1"); +// If compression ratio is larger than this threshold, consider it as a good compresiosn +CONF_mDouble(lz4_expected_compression_ratio, "2.1"); + // Serialize and deserialize each returned row batch. CONF_Bool(serialize_batch, "false"); // Interval between profile reports; in seconds. diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp index b4fd49eb39b70f..a2696a2a65f54d 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp @@ -31,6 +31,7 @@ #include "runtime/exec_env.h" #include "runtime/local_pass_through_buffer.h" #include "runtime/runtime_state.h" +#include "serde/compress_strategy.h" #include "serde/protobuf_serde.h" #include "service/brpc.h" #include "util/compression/block_compression.h" @@ -397,7 +398,13 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { } // Set compression type according to query options if (state->query_options().__isset.transmission_compression_type) { - _compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type); + TCompressionType::type type = state->query_options().transmission_compression_type; + if (type == TCompressionType::AUTO) { + _compress_type = CompressionTypePB::LZ4; + _compress_strategy = std::make_shared(); + } else { + _compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type); + } } else if (config::compress_rowbatches) { // If transmission_compression_type is not set, use compress_rowbatches to check if // compress transmitted data. @@ -436,7 +443,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { std::shuffle(_channel_indices.begin(), _channel_indices.end(), std::mt19937(std::random_device()())); _bytes_pass_through_counter = ADD_COUNTER(_unique_metrics, "BytesPassThrough", TUnit::BYTES); - _sender_input_bytes_counter = ADD_COUNTER(_unique_metrics, "SenderInputBytes", TUnit::BYTES); + _raw_input_bytes_counter = ADD_COUNTER(_unique_metrics, "RawInputBytes", TUnit::BYTES); _serialized_bytes_counter = ADD_COUNTER(_unique_metrics, "SerializedBytes", TUnit::BYTES); _compressed_bytes_counter = ADD_COUNTER(_unique_metrics, "CompressedBytes", TUnit::BYTES); @@ -666,10 +673,11 @@ void ExchangeSinkOperator::close(RuntimeState* state) { Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, bool* is_first_chunk, int num_receivers) { VLOG_ROW << "[ExchangeSinkOperator] serializing " << src->num_rows() << " rows"; - auto send_input_bytes = serde::ProtobufChunkSerde::max_serialized_size(*src, nullptr); - COUNTER_UPDATE(_sender_input_bytes_counter, send_input_bytes * num_receivers); + auto unserialized_bytes = src->bytes_usage(); + COUNTER_UPDATE(_raw_input_bytes_counter, unserialized_bytes * num_receivers); + int64_t serialization_time_ns = 0; { - SCOPED_TIMER(_serialize_chunk_timer); + ScopedTimer _timer(_serialize_chunk_timer); // We only serialize chunk meta for first chunk if (*is_first_chunk) { _encode_context = serde::EncodeContext::get_encode_context_shared_ptr(src->columns().size(), _encode_level); @@ -684,6 +692,7 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo RETURN_IF_ERROR(res); res->Swap(dst); } + serialization_time_ns = _timer.elapsed_time(); } if (_encode_context) { _encode_context->set_encode_levels_in_pb(dst); @@ -699,8 +708,12 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo } // try compress the ChunkPB data - if (_compress_codec != nullptr && serialized_size > 0) { - SCOPED_TIMER(_compress_timer); + bool use_compression = true; + if (_compress_strategy) { + use_compression = _compress_strategy->decide(); + } + if (_compress_codec != nullptr && serialized_size > 0 && use_compression) { + ScopedTimer _timer(_compress_timer); if (use_compression_pool(_compress_codec->type())) { Slice compressed_slice; @@ -720,14 +733,20 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo RETURN_IF_ERROR(_compress_codec->compress(input, &compressed_slice)); _compression_scratch.resize(compressed_slice.size); } + if (_compress_strategy) { + int64_t compression_time_ns = _timer.elapsed_time(); + _compress_strategy->feedback(serialized_size, _compression_scratch.size(), serialization_time_ns, + compression_time_ns); + } COUNTER_UPDATE(_compressed_bytes_counter, _compression_scratch.size() * num_receivers); double compress_ratio = (static_cast(serialized_size)) / _compression_scratch.size(); + VLOG_ROW << "chunk compression: uncompressed size: " << serialized_size + << ", compressed size: " << _compression_scratch.size(); if (LIKELY(compress_ratio > config::rpc_compress_ratio_threshold)) { dst->mutable_data()->swap(reinterpret_cast(_compression_scratch)); dst->set_compress_type(_compress_type); } - VLOG_ROW << "uncompressed size: " << serialized_size << ", compressed size: " << _compression_scratch.size(); } return Status::OK(); } diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.h b/be/src/exec/pipeline/exchange/exchange_sink_operator.h index f873f34f0af214..344be0013184ee 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.h +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.h @@ -28,6 +28,7 @@ #include "exec/pipeline/operator.h" #include "gen_cpp/data.pb.h" #include "gen_cpp/internal_service.pb.h" +#include "serde/compress_strategy.h" #include "serde/protobuf_serde.h" #include "util/raw_container.h" #include "util/runtime_profile.h" @@ -171,6 +172,8 @@ class ExchangeSinkOperator final : public Operator { CompressionTypePB _compress_type = CompressionTypePB::NO_COMPRESSION; const BlockCompressionCodec* _compress_codec = nullptr; + std::shared_ptr _encode_context = nullptr; + std::shared_ptr _compress_strategy; RuntimeProfile::Counter* _serialize_chunk_timer = nullptr; RuntimeProfile::Counter* _shuffle_hash_timer = nullptr; @@ -178,7 +181,7 @@ class ExchangeSinkOperator final : public Operator { RuntimeProfile::Counter* _shuffle_chunk_append_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; RuntimeProfile::Counter* _bytes_pass_through_counter = nullptr; - RuntimeProfile::Counter* _sender_input_bytes_counter = nullptr; + RuntimeProfile::Counter* _raw_input_bytes_counter = nullptr; RuntimeProfile::Counter* _serialized_bytes_counter = nullptr; RuntimeProfile::Counter* _compressed_bytes_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _pass_through_buffer_peak_mem_usage = nullptr; @@ -208,8 +211,6 @@ class ExchangeSinkOperator final : public Operator { const std::vector& _output_columns; std::unique_ptr _shuffler; - - std::shared_ptr _encode_context = nullptr; }; class ExchangeSinkOperatorFactory final : public OperatorFactory { diff --git a/be/src/serde/CMakeLists.txt b/be/src/serde/CMakeLists.txt index 3b0ad2c80a0617..30d1137d479b6d 100644 --- a/be/src/serde/CMakeLists.txt +++ b/be/src/serde/CMakeLists.txt @@ -20,5 +20,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/serde") add_library(Serde STATIC encode_context.cpp column_array_serde.cpp + compress_strategy.cpp protobuf_serde.cpp ) diff --git a/be/src/serde/compress_strategy.cpp b/be/src/serde/compress_strategy.cpp new file mode 100644 index 00000000000000..74624858598643 --- /dev/null +++ b/be/src/serde/compress_strategy.cpp @@ -0,0 +1,46 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "serde/compress_strategy.h" + +#include + +#include "common/config.h" + +namespace starrocks::serde { + +CompressStrategy::CompressStrategy() : _rd(), _gen(_rd()), _dis(0.0, 1.0) {} + +void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns, + uint64_t compression_time_ns) { + if (uncompressed_bytes == 0 || compressed_bytes == 0) { + return; + } + // TODO: consider the compression_time as reward factor + double compress_ratio = (uncompressed_bytes + 1) / (compressed_bytes + 1); + double reward_ratio = compress_ratio / config::lz4_expected_compression_ratio; + if (reward_ratio > 1.0) { + _alpha += reward_ratio * reward_ratio; + } else { + _beta += 1 / (reward_ratio * reward_ratio); + } +} + +bool CompressStrategy::decide() { + double theta = _dis(_gen); + double probability = _alpha / (_alpha + _beta); + return theta < probability; +} + +} // namespace starrocks::serde diff --git a/be/src/serde/compress_strategy.h b/be/src/serde/compress_strategy.h new file mode 100644 index 00000000000000..a0532d419722a9 --- /dev/null +++ b/be/src/serde/compress_strategy.h @@ -0,0 +1,46 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace starrocks::serde { + +// Compression strategy based on Thompson Sampling +class CompressStrategy { +public: + CompressStrategy(); + ~CompressStrategy() = default; + + // Give the feedback based on previous compression + void feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns, + uint64_t compression_time_ns); + + // Make the decision for the next compression + bool decide(); + +private: + // Generate a random number within [0, 1] + std::random_device _rd; + std::mt19937 _gen; + std::uniform_real_distribution _dis; + + // Thompson sampling parameters, biased for TRUE value + double _alpha = 3.0; // Success count + double _beta = 1.0; // Failure count +}; + +} // namespace starrocks::serde \ No newline at end of file diff --git a/be/src/serde/encode_context.cpp b/be/src/serde/encode_context.cpp index 6f9a58c8cd2421..53635be08a6180 100644 --- a/be/src/serde/encode_context.cpp +++ b/be/src/serde/encode_context.cpp @@ -50,10 +50,10 @@ void EncodeContext::_adjust(const int col_id) { _column_encode_level[col_id] = 0; } if (old_level != _column_encode_level[col_id] || _session_encode_level < -1) { - VLOG_ROW << "Old encode level " << old_level << " is changed to " << _column_encode_level[col_id] - << " because the first " << EncodeSamplingNum << " of " << _frequency << " in total " << _times - << " chunks' compression ratio is " << _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id] - << " higher than limit " << EncodeRatioLimit; + VLOG_ROW << "column " << col_id << " encode_level changed from " << old_level << " to " + << _column_encode_level[col_id] << " because the first " << EncodeSamplingNum << " of " << _frequency + << " in total " << _times << " chunks' compression ratio is " + << _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id] << " higher than limit " << EncodeRatioLimit; } _encoded_bytes[col_id] = 0; _raw_bytes[col_id] = 0; diff --git a/be/src/util/compression/block_compression.cpp b/be/src/util/compression/block_compression.cpp index 26c6861ba887f0..443ea8324e7701 100644 --- a/be/src/util/compression/block_compression.cpp +++ b/be/src/util/compression/block_compression.cpp @@ -34,6 +34,8 @@ #include "util/compression/block_compression.h" +#include "common/config.h" + #ifdef __x86_64__ #include #endif @@ -148,7 +150,7 @@ class Lz4BlockCompression : public BlockCompressionCodec { } } - int32_t acceleration = 1; + int32_t acceleration = config::lz4_acceleration; size_t compressed_size = LZ4_compress_fast_continue(ctx, input.data, output->data, input.size, output->size, acceleration); diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index ceb6f34302c343..aa8c15d1159eed 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -735,6 +735,8 @@ class ScopedTimer { void start() { _sw.start(); } + int64_t elapsed_time() { return _sw.elapsed_time(); } + bool is_cancelled() { return _is_cancelled != nullptr && *_is_cancelled; } void UpdateCounter() { diff --git a/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java b/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java index ea37c2afd2edac..470c09193060a0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java @@ -36,6 +36,7 @@ public class CompressionUtils { private static final ImmutableMap T_COMPRESSION_BY_NAME = (new ImmutableSortedMap.Builder(String.CASE_INSENSITIVE_ORDER)) .put("NO_COMPRESSION", TCompressionType.NO_COMPRESSION) + .put("AUTO", TCompressionType.AUTO) .put("LZ4", TCompressionType.LZ4) .put("LZ4_FRAME", TCompressionType.LZ4_FRAME) .put("SNAPPY", TCompressionType.SNAPPY) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 36058a0bf132be..f6e91070d2b65a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -1409,8 +1409,8 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = NEW_PLANER_AGG_STAGE) private int newPlannerAggStage = SessionVariableConstants.AggregationStage.AUTO.ordinal(); - @VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE) - private String transmissionCompressionType = "NO_COMPRESSION"; + @VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE) + private String transmissionCompressionType = "AUTO"; // if a packet's size is larger than RPC_HTTP_MIN_SIZE, it will use RPC via http, as the std rpc has 2GB size limit. // the setting size is a bit smaller than 2GB, as the pre-computed serialization size of packets may not accurate. diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index ddabbdf0309ac9..bfeec183e8916e 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -530,6 +530,7 @@ enum TCompressionType { BZIP2 = 10; LZO = 11; // Deprecated BROTLI = 12; + AUTO = 13; } enum TWriteQuorumType { From d259e2598c9f683a4b600fde32a69dfc1ad93e66 Mon Sep 17 00:00:00 2001 From: Murphy Date: Fri, 10 Jan 2025 19:54:05 +0800 Subject: [PATCH 2/3] fix Thompson sampling Signed-off-by: Murphy --- be/src/serde/compress_strategy.cpp | 14 ++++++++++---- be/src/serde/compress_strategy.h | 3 --- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/be/src/serde/compress_strategy.cpp b/be/src/serde/compress_strategy.cpp index 74624858598643..d40d428772eb52 100644 --- a/be/src/serde/compress_strategy.cpp +++ b/be/src/serde/compress_strategy.cpp @@ -15,12 +15,13 @@ #include "serde/compress_strategy.h" #include +#include #include "common/config.h" namespace starrocks::serde { -CompressStrategy::CompressStrategy() : _rd(), _gen(_rd()), _dis(0.0, 1.0) {} +CompressStrategy::CompressStrategy() : _gen(std::random_device()()) {} void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns, uint64_t compression_time_ns) { @@ -38,9 +39,14 @@ void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed } bool CompressStrategy::decide() { - double theta = _dis(_gen); - double probability = _alpha / (_alpha + _beta); - return theta < probability; + std::gamma_distribution gamma_alpha(_alpha, 1.0); + std::gamma_distribution gamma_beta(_beta, 1.0); + + double sample_alpha = gamma_alpha(_gen); + double sample_beta = gamma_beta(_gen); + + double theta = sample_alpha / (sample_alpha + sample_beta); + return theta > 0.5; } } // namespace starrocks::serde diff --git a/be/src/serde/compress_strategy.h b/be/src/serde/compress_strategy.h index a0532d419722a9..44b051dff49843 100644 --- a/be/src/serde/compress_strategy.h +++ b/be/src/serde/compress_strategy.h @@ -33,10 +33,7 @@ class CompressStrategy { bool decide(); private: - // Generate a random number within [0, 1] - std::random_device _rd; std::mt19937 _gen; - std::uniform_real_distribution _dis; // Thompson sampling parameters, biased for TRUE value double _alpha = 3.0; // Success count From 88f3996b60120d34dc2860bdd4a8c4a7cf378e04 Mon Sep 17 00:00:00 2001 From: Murphy Date: Sun, 12 Jan 2025 15:46:35 +0800 Subject: [PATCH 3/3] consider the compression speed Signed-off-by: Murphy --- be/src/common/config.h | 1 + be/src/serde/compress_strategy.cpp | 13 +++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 588d122e958e48..13c783e768e485 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -220,6 +220,7 @@ CONF_mDouble(rpc_compress_ratio_threshold, "1.1"); CONF_mInt32(lz4_acceleration, "1"); // If compression ratio is larger than this threshold, consider it as a good compresiosn CONF_mDouble(lz4_expected_compression_ratio, "2.1"); +CONF_mDouble(lz4_expected_compression_speed_mbps, "600"); // Serialize and deserialize each returned row batch. CONF_Bool(serialize_batch, "false"); diff --git a/be/src/serde/compress_strategy.cpp b/be/src/serde/compress_strategy.cpp index d40d428772eb52..cd447974e15862 100644 --- a/be/src/serde/compress_strategy.cpp +++ b/be/src/serde/compress_strategy.cpp @@ -25,16 +25,17 @@ CompressStrategy::CompressStrategy() : _gen(std::random_device()()) {} void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns, uint64_t compression_time_ns) { - if (uncompressed_bytes == 0 || compressed_bytes == 0) { + if (uncompressed_bytes == 0 || compressed_bytes == 0 || compression_time_ns == 0) { return; } - // TODO: consider the compression_time as reward factor - double compress_ratio = (uncompressed_bytes + 1) / (compressed_bytes + 1); - double reward_ratio = compress_ratio / config::lz4_expected_compression_ratio; + double compress_speed = uncompressed_bytes / compression_time_ns * (1e9 / 1024 / 1024); // MB/s + double compress_ratio = uncompressed_bytes / compressed_bytes; + double reward_ratio = (compress_ratio / config::lz4_expected_compression_ratio) * + (compress_speed / config::lz4_expected_compression_speed_mbps); if (reward_ratio > 1.0) { - _alpha += reward_ratio * reward_ratio; + _alpha += 1; } else { - _beta += 1 / (reward_ratio * reward_ratio); + _beta += 1; } }