Skip to content

Commit

Permalink
[Enhancement] adaptive enable the exchange compression (#54956)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
(cherry picked from commit b92f139)
  • Loading branch information
murphyatwork authored and mergify[bot] committed Jan 13, 2025
1 parent 8535436 commit e0dbf35
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 18 deletions.
9 changes: 9 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,20 @@ CONF_String(default_query_options, "");
// or 3x the number of cores. This keeps the cores busy without causing excessive
// thrashing.
CONF_Int32(num_threads_per_core, "3");

// Compression related parameters
// If true, compresses tuple data in Serialize.
CONF_Bool(compress_rowbatches, "true");
// Compress ratio when shuffle row_batches in network, not in storage engine.
// If ratio is less than this value, use uncompressed data instead.
CONF_mDouble(rpc_compress_ratio_threshold, "1.1");
// Acceleration of LZ4 Compression, the larger the acceleration value, the faster the algorithm, but also the lesser the compression.
// Default 1, MIN=1, MAX=65537
CONF_mInt32(lz4_acceleration, "1");
// If compression ratio is larger than this threshold, consider it as a good compresiosn
CONF_mDouble(lz4_expected_compression_ratio, "2.1");
CONF_mDouble(lz4_expected_compression_speed_mbps, "600");

// Serialize and deserialize each returned row batch.
CONF_Bool(serialize_batch, "false");
// Interval between profile reports; in seconds.
Expand Down
35 changes: 27 additions & 8 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/exec_env.h"
#include "runtime/local_pass_through_buffer.h"
#include "runtime/runtime_state.h"
#include "serde/compress_strategy.h"
#include "serde/protobuf_serde.h"
#include "service/brpc.h"
#include "util/compression/block_compression.h"
Expand Down Expand Up @@ -397,7 +398,13 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
}
// Set compression type according to query options
if (state->query_options().__isset.transmission_compression_type) {
_compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type);
TCompressionType::type type = state->query_options().transmission_compression_type;
if (type == TCompressionType::AUTO) {
_compress_type = CompressionTypePB::LZ4;
_compress_strategy = std::make_shared<serde::CompressStrategy>();
} else {
_compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type);
}
} else if (config::compress_rowbatches) {
// If transmission_compression_type is not set, use compress_rowbatches to check if
// compress transmitted data.
Expand Down Expand Up @@ -436,7 +443,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
std::shuffle(_channel_indices.begin(), _channel_indices.end(), std::mt19937(std::random_device()()));

_bytes_pass_through_counter = ADD_COUNTER(_unique_metrics, "BytesPassThrough", TUnit::BYTES);
_sender_input_bytes_counter = ADD_COUNTER(_unique_metrics, "SenderInputBytes", TUnit::BYTES);
_raw_input_bytes_counter = ADD_COUNTER(_unique_metrics, "RawInputBytes", TUnit::BYTES);
_serialized_bytes_counter = ADD_COUNTER(_unique_metrics, "SerializedBytes", TUnit::BYTES);
_compressed_bytes_counter = ADD_COUNTER(_unique_metrics, "CompressedBytes", TUnit::BYTES);

Expand Down Expand Up @@ -666,10 +673,11 @@ void ExchangeSinkOperator::close(RuntimeState* state) {

Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, bool* is_first_chunk, int num_receivers) {
VLOG_ROW << "[ExchangeSinkOperator] serializing " << src->num_rows() << " rows";
auto send_input_bytes = serde::ProtobufChunkSerde::max_serialized_size(*src, nullptr);
COUNTER_UPDATE(_sender_input_bytes_counter, send_input_bytes * num_receivers);
auto unserialized_bytes = src->bytes_usage();
COUNTER_UPDATE(_raw_input_bytes_counter, unserialized_bytes * num_receivers);
int64_t serialization_time_ns = 0;
{
SCOPED_TIMER(_serialize_chunk_timer);
ScopedTimer<MonotonicStopWatch> _timer(_serialize_chunk_timer);
// We only serialize chunk meta for first chunk
if (*is_first_chunk) {
_encode_context = serde::EncodeContext::get_encode_context_shared_ptr(src->columns().size(), _encode_level);
Expand All @@ -684,6 +692,7 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo
RETURN_IF_ERROR(res);
res->Swap(dst);
}
serialization_time_ns = _timer.elapsed_time();
}
if (_encode_context) {
_encode_context->set_encode_levels_in_pb(dst);
Expand All @@ -699,8 +708,12 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo
}

// try compress the ChunkPB data
if (_compress_codec != nullptr && serialized_size > 0) {
SCOPED_TIMER(_compress_timer);
bool use_compression = true;
if (_compress_strategy) {
use_compression = _compress_strategy->decide();
}
if (_compress_codec != nullptr && serialized_size > 0 && use_compression) {
ScopedTimer<MonotonicStopWatch> _timer(_compress_timer);

if (use_compression_pool(_compress_codec->type())) {
Slice compressed_slice;
Expand All @@ -720,14 +733,20 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo
RETURN_IF_ERROR(_compress_codec->compress(input, &compressed_slice));
_compression_scratch.resize(compressed_slice.size);
}
if (_compress_strategy) {
int64_t compression_time_ns = _timer.elapsed_time();
_compress_strategy->feedback(serialized_size, _compression_scratch.size(), serialization_time_ns,
compression_time_ns);
}

COUNTER_UPDATE(_compressed_bytes_counter, _compression_scratch.size() * num_receivers);
double compress_ratio = (static_cast<double>(serialized_size)) / _compression_scratch.size();
VLOG_ROW << "chunk compression: uncompressed size: " << serialized_size
<< ", compressed size: " << _compression_scratch.size();
if (LIKELY(compress_ratio > config::rpc_compress_ratio_threshold)) {
dst->mutable_data()->swap(reinterpret_cast<std::string&>(_compression_scratch));
dst->set_compress_type(_compress_type);
}
VLOG_ROW << "uncompressed size: " << serialized_size << ", compressed size: " << _compression_scratch.size();
}
return Status::OK();
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "exec/pipeline/operator.h"
#include "gen_cpp/data.pb.h"
#include "gen_cpp/internal_service.pb.h"
#include "serde/compress_strategy.h"
#include "serde/protobuf_serde.h"
#include "util/raw_container.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -171,14 +172,16 @@ class ExchangeSinkOperator final : public Operator {

CompressionTypePB _compress_type = CompressionTypePB::NO_COMPRESSION;
const BlockCompressionCodec* _compress_codec = nullptr;
std::shared_ptr<serde::EncodeContext> _encode_context = nullptr;
std::shared_ptr<serde::CompressStrategy> _compress_strategy;

RuntimeProfile::Counter* _serialize_chunk_timer = nullptr;
RuntimeProfile::Counter* _shuffle_hash_timer = nullptr;
RuntimeProfile::Counter* _shuffle_chunk_append_counter = nullptr;
RuntimeProfile::Counter* _shuffle_chunk_append_timer = nullptr;
RuntimeProfile::Counter* _compress_timer = nullptr;
RuntimeProfile::Counter* _bytes_pass_through_counter = nullptr;
RuntimeProfile::Counter* _sender_input_bytes_counter = nullptr;
RuntimeProfile::Counter* _raw_input_bytes_counter = nullptr;
RuntimeProfile::Counter* _serialized_bytes_counter = nullptr;
RuntimeProfile::Counter* _compressed_bytes_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _pass_through_buffer_peak_mem_usage = nullptr;
Expand Down Expand Up @@ -208,8 +211,6 @@ class ExchangeSinkOperator final : public Operator {
const std::vector<int32_t>& _output_columns;

std::unique_ptr<Shuffler> _shuffler;

std::shared_ptr<serde::EncodeContext> _encode_context = nullptr;
};

class ExchangeSinkOperatorFactory final : public OperatorFactory {
Expand Down
1 change: 1 addition & 0 deletions be/src/serde/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/serde")
add_library(Serde STATIC
encode_context.cpp
column_array_serde.cpp
compress_strategy.cpp
protobuf_serde.cpp
)
53 changes: 53 additions & 0 deletions be/src/serde/compress_strategy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "serde/compress_strategy.h"

#include <cmath>
#include <random>

#include "common/config.h"

namespace starrocks::serde {

CompressStrategy::CompressStrategy() : _gen(std::random_device()()) {}

void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns,
uint64_t compression_time_ns) {
if (uncompressed_bytes == 0 || compressed_bytes == 0 || compression_time_ns == 0) {
return;
}
double compress_speed = uncompressed_bytes / compression_time_ns * (1e9 / 1024 / 1024); // MB/s
double compress_ratio = uncompressed_bytes / compressed_bytes;
double reward_ratio = (compress_ratio / config::lz4_expected_compression_ratio) *
(compress_speed / config::lz4_expected_compression_speed_mbps);
if (reward_ratio > 1.0) {
_alpha += 1;
} else {
_beta += 1;
}
}

bool CompressStrategy::decide() {
std::gamma_distribution<double> gamma_alpha(_alpha, 1.0);
std::gamma_distribution<double> gamma_beta(_beta, 1.0);

double sample_alpha = gamma_alpha(_gen);
double sample_beta = gamma_beta(_gen);

double theta = sample_alpha / (sample_alpha + sample_beta);
return theta > 0.5;
}

} // namespace starrocks::serde
43 changes: 43 additions & 0 deletions be/src/serde/compress_strategy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstdint>
#include <random>

namespace starrocks::serde {

// Compression strategy based on Thompson Sampling
class CompressStrategy {
public:
CompressStrategy();
~CompressStrategy() = default;

// Give the feedback based on previous compression
void feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns,
uint64_t compression_time_ns);

// Make the decision for the next compression
bool decide();

private:
std::mt19937 _gen;

// Thompson sampling parameters, biased for TRUE value
double _alpha = 3.0; // Success count
double _beta = 1.0; // Failure count
};

} // namespace starrocks::serde
8 changes: 4 additions & 4 deletions be/src/serde/encode_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ void EncodeContext::_adjust(const int col_id) {
_column_encode_level[col_id] = 0;
}
if (old_level != _column_encode_level[col_id] || _session_encode_level < -1) {
VLOG_ROW << "Old encode level " << old_level << " is changed to " << _column_encode_level[col_id]
<< " because the first " << EncodeSamplingNum << " of " << _frequency << " in total " << _times
<< " chunks' compression ratio is " << _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id]
<< " higher than limit " << EncodeRatioLimit;
VLOG_ROW << "column " << col_id << " encode_level changed from " << old_level << " to "
<< _column_encode_level[col_id] << " because the first " << EncodeSamplingNum << " of " << _frequency
<< " in total " << _times << " chunks' compression ratio is "
<< _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id] << " higher than limit " << EncodeRatioLimit;
}
_encoded_bytes[col_id] = 0;
_raw_bytes[col_id] = 0;
Expand Down
4 changes: 3 additions & 1 deletion be/src/util/compression/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include "util/compression/block_compression.h"

#include "common/config.h"

#ifdef __x86_64__
#include <libdeflate.h>
#endif
Expand Down Expand Up @@ -148,7 +150,7 @@ class Lz4BlockCompression : public BlockCompressionCodec {
}
}

int32_t acceleration = 1;
int32_t acceleration = config::lz4_acceleration;
size_t compressed_size =
LZ4_compress_fast_continue(ctx, input.data, output->data, input.size, output->size, acceleration);

Expand Down
2 changes: 2 additions & 0 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@ class ScopedTimer {

void start() { _sw.start(); }

int64_t elapsed_time() { return _sw.elapsed_time(); }

bool is_cancelled() { return _is_cancelled != nullptr && *_is_cancelled; }

void UpdateCounter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CompressionUtils {
private static final ImmutableMap<String, TCompressionType> T_COMPRESSION_BY_NAME =
(new ImmutableSortedMap.Builder<String, TCompressionType>(String.CASE_INSENSITIVE_ORDER))
.put("NO_COMPRESSION", TCompressionType.NO_COMPRESSION)
.put("AUTO", TCompressionType.AUTO)
.put("LZ4", TCompressionType.LZ4)
.put("LZ4_FRAME", TCompressionType.LZ4_FRAME)
.put("SNAPPY", TCompressionType.SNAPPY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1400,8 +1400,8 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = NEW_PLANER_AGG_STAGE)
private int newPlannerAggStage = SessionVariableConstants.AggregationStage.AUTO.ordinal();

@VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE)
private String transmissionCompressionType = "NO_COMPRESSION";
@VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE)
private String transmissionCompressionType = "AUTO";

// if a packet's size is larger than RPC_HTTP_MIN_SIZE, it will use RPC via http, as the std rpc has 2GB size limit.
// the setting size is a bit smaller than 2GB, as the pre-computed serialization size of packets may not accurate.
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ enum TCompressionType {
BZIP2 = 10;
LZO = 11; // Deprecated
BROTLI = 12;
AUTO = 13;
}

enum TWriteQuorumType {
Expand Down

0 comments on commit e0dbf35

Please sign in to comment.