Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Stream load/routine support load channel profile #55490

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,24 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
_colocate_mv_index = table_sink.enable_colocate_mv_index && config::enable_load_colocate_mv;
}

// Query context is only available for pipeline engine
auto query_ctx = state->query_ctx();
if (query_ctx) {
if (state->query_ctx()) {
// Query context is only available for pipeline engine (insert/broker load)
auto query_ctx = state->query_ctx();
_load_channel_profile_config.set_enable_profile(query_ctx->get_enable_profile_flag());
_load_channel_profile_config.set_big_query_profile_threshold_ns(
query_ctx->get_big_query_profile_threshold_ns());
_load_channel_profile_config.set_runtime_profile_report_interval_ns(
query_ctx->get_runtime_profile_report_interval_ns());
} else {
// For non-pipeline engine (stream load/routine load), get the profile config from query options
auto& query_options = state->query_options();
if (query_options.__isset.enable_profile) {
_load_channel_profile_config.set_enable_profile(query_options.enable_profile);
}
if (query_options.__isset.load_profile_collect_second) {
_load_channel_profile_config.set_big_query_profile_threshold_ns(query_options.load_profile_collect_second *
1e9);
}
}

return Status::OK();
banmoy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ class FragmentExecState {
std::shared_ptr<RuntimeState> runtime_state() { return _runtime_state; }

private:
void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);
void coordinator_callback(const Status& status, RuntimeProfile* profile, RuntimeProfile* load_channel_profile,
bool done);

std::shared_ptr<RuntimeState> _runtime_state = nullptr;

Expand Down Expand Up @@ -169,8 +170,9 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, const TUniqueId&
_backend_num(backend_num),
_exec_env(exec_env),
_coord_addr(coord_addr),
_executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)) {
_executor(exec_env,
std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_3)) {
_start_time = DateTimeValue::local_time();
}

Expand Down Expand Up @@ -230,7 +232,8 @@ std::string FragmentExecState::to_http_path(const std::string& file_name) {
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution status,
// including the final status when execution finishes.
void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, bool done) {
void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile,
RuntimeProfile* load_channel_profile, bool done) {
DCHECK(status.ok() || done); // if !status.ok() => done
Status exec_status = update_status(status);

Expand All @@ -256,6 +259,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
profile->to_thrift(&params.profile);
params.__isset.profile = true;

load_channel_profile->to_thrift(&params.load_channel_profile);
params.__isset.load_channel_profile = true;

if (!runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : runtime_state->output_files()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void PlanFragmentExecutor::send_report(bool done) {
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
_report_status_cb(status, profile(), done || !status.ok());
_report_status_cb(status, profile(), _runtime_state->load_channel_profile(), done || !status.ok());
}

Status PlanFragmentExecutor::_get_next_internal_vectorized(ChunkPtr* chunk) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class PlanFragmentExecutor {
// Note: this does not take a const RuntimeProfile&, because it might need to call
// functions like PrettyPrint() or to_thrift(), neither of which is const
// because they take locks.
typedef std::function<void(const Status& status, RuntimeProfile* profile, bool done)> report_status_callback;
typedef std::function<void(const Status& status, RuntimeProfile* profile, RuntimeProfile* load_channel_profile,
bool done)>
report_status_callback;

// if report_status_cb is not empty, is used to report the accumulated profile
// information periodically during execution open().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ public void collectProfile(boolean isAborted) {
coord.collectProfileSync();
profile.addChild(coord.buildQueryProfile(true));
} else {
profile.addChild(coord.getQueryProfile());
profile.addChild(coord.buildQueryProfile(true));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public void initFragmentProfiles(int numFragments) {
}
}

public List<RuntimeProfile> getFragmentProfiles() {
return fragmentProfiles;
}

public List<String> getDeltaUrls() {
return deltaUrls;
}
Expand Down Expand Up @@ -405,9 +409,12 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
}

public RuntimeProfile buildQueryProfile(boolean needMerge) {
if (!needMerge || !jobSpec.isEnablePipeline()) {
if (!needMerge) {
return queryProfile;
}
if (!jobSpec.isEnablePipeline()) {
return mergeNonPipelineProfile();
}

RuntimeProfile newQueryProfile = new RuntimeProfile(queryProfile.getName());
long start = System.nanoTime();
Expand Down Expand Up @@ -616,6 +623,21 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) {
return newQueryProfile;
}

RuntimeProfile mergeNonPipelineProfile() {
if (loadChannelProfile.isEmpty()) {
return queryProfile;
}
RuntimeProfile newQueryProfile = new RuntimeProfile(queryProfile.getName());
newQueryProfile.copyAllInfoStringsFrom(queryProfile, null);
newQueryProfile.copyAllCountersFrom(queryProfile);
for (RuntimeProfile fragmentProfile : fragmentProfiles) {
newQueryProfile.addChild(fragmentProfile);
}
Optional<RuntimeProfile> mergedLoadChannelProfile = mergeLoadChannelProfile();
mergedLoadChannelProfile.ifPresent(newQueryProfile::addChild);
return newQueryProfile;
}

Optional<RuntimeProfile> mergeLoadChannelProfile() {
if (loadChannelProfile.isEmpty()) {
return Optional.empty();
banmoy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public long getWarehouseId() {

private boolean incrementalScanRanges = false;

private boolean isSyncStreamLoad = false;

public static class Factory {
private Factory() {
}
Expand Down Expand Up @@ -303,6 +305,7 @@ public static JobSpec fromSyncStreamLoadSpec(StreamLoadPlanner planner) {
.enablePipeline(false)
.resourceGroup(null)
.warehouseId(planner.getWarehouseId())
.setSyncStreamLoad()
.build();
}

Expand Down Expand Up @@ -523,7 +526,7 @@ public boolean hasOlapTableSink() {
return true;
}
}
return false;
return isSyncStreamLoad;
}

public static class Builder {
Expand Down Expand Up @@ -621,6 +624,11 @@ private Builder setPlanProtocol(String planProtocol) {
return this;
}

private Builder setSyncStreamLoad() {
instance.isSyncStreamLoad = true;
return this;
}

/**
* Whether it can use pipeline engine.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setup() {
}

@Test
public void testLoadChannelProfile() {
public void testMergeLoadChannelProfile() {
new Expectations() {
{
jobSpec.hasOlapTableSink();
Expand All @@ -75,8 +75,36 @@ public void testLoadChannelProfile() {
profile.updateLoadChannelProfile(reportExecStatusParams);
Optional<RuntimeProfile> optional = profile.mergeLoadChannelProfile();
Assert.assertTrue(optional.isPresent());
RuntimeProfile mergedProfile = optional.get();
verifyMergedLoadChannelProfile(optional.get());
}

@Test
public void testBuildNonPipelineQueryProfile() {
new Expectations() {
{
jobSpec.hasOlapTableSink();
result = true;
minTimes = 0;
jobSpec.isEnablePipeline();
result = false;
minTimes = 0;
}
};

QueryRuntimeProfile profile = new QueryRuntimeProfile(connectContext, jobSpec, false);
profile.initFragmentProfiles(1);
TReportExecStatusParams reportExecStatusParams = buildReportStatus();
profile.updateLoadChannelProfile(reportExecStatusParams);
RuntimeProfile runtimeProfile = profile.buildQueryProfile(true);
Assert.assertNotNull(runtimeProfile);
Assert.assertEquals(2, runtimeProfile.getChildMap().size());
Assert.assertSame(profile.getFragmentProfiles().get(0), runtimeProfile.getChild("Fragment 0"));
RuntimeProfile loadChannelProfile = runtimeProfile.getChild("LoadChannel");
Assert.assertNotNull(loadChannelProfile);
verifyMergedLoadChannelProfile(loadChannelProfile);
}

private void verifyMergedLoadChannelProfile(RuntimeProfile mergedProfile) {
Assert.assertEquals("LoadChannel", mergedProfile.getName());
Assert.assertEquals("288fb1df-f955-472f-a377-cb1e10e4d993", mergedProfile.getInfoString("LoadId"));
Assert.assertEquals("40", mergedProfile.getInfoString("TxnId"));
Expand Down
28 changes: 27 additions & 1 deletion test/sql/test_profile/R/test_load_channel_profile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,33 @@ INSERT INTO `t0` (v1, v2, v3) values
(2, 4, 12);
-- result:
-- !result
shell: env mysql_cmd="${mysql_cmd} -D${db[0]}" bash ${root_path}/sql/test_profile/T/test_load_channel_profile_analysis.sh
SET enable_profile=true;
-- result:
-- !result
SET enable_async_profile=false;
-- result:
-- !result
INSERT INTO t0 WITH LABEL label_${uuid0} SELECT * FROM t0;
-- result:
-- !result
shell: env mysql_cmd="${mysql_cmd} -D${db[0]}" label="label_${uuid0}" bash ${root_path}/sql/test_profile/T/test_load_channel_profile_analysis.sh
-- result:
0
Analyze profile succeeded
Analyze profile succeeded
-- !result
alter table t0 set('enable_load_profile'='true');
-- result:
-- !result
shell: curl --location-trusted -u root: -H "Expect:100-continue" -H "label: "label_${uuid1}"" -H "column_separator: ," -d '1,2,3' -X PUT ${url}/api/${db[0]}/t0/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
shell: env mysql_cmd="${mysql_cmd} -D${db[0]}" label="label_${uuid1}" bash ${root_path}/sql/test_profile/T/test_load_channel_profile_analysis.sh
-- result:
0
Analyze profile succeeded
Expand Down
9 changes: 8 additions & 1 deletion test/sql/test_profile/T/test_load_channel_profile
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@ INSERT INTO `t0` (v1, v2, v3) values
(2, 4, 11),
(2, 4, 12);

shell: env mysql_cmd="${mysql_cmd} -D${db[0]}" bash ${root_path}/sql/test_profile/T/test_load_channel_profile_analysis.sh
SET enable_profile=true;
SET enable_async_profile=false;
INSERT INTO t0 WITH LABEL label_${uuid0} SELECT * FROM t0;
shell: env mysql_cmd="${mysql_cmd} -D${db[0]}" label="label_${uuid0}" bash ${root_path}/sql/test_profile/T/test_load_channel_profile_analysis.sh

alter table t0 set('enable_load_profile'='true');
shell: curl --location-trusted -u root: -H "Expect:100-continue" -H "label: "label_${uuid1}"" -H "column_separator: ," -d '1,2,3' -X PUT ${url}/api/${db[0]}/t0/_stream_load
shell: env mysql_cmd="${mysql_cmd} -D${db[0]}" label="label_${uuid1}" bash ${root_path}/sql/test_profile/T/test_load_channel_profile_analysis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ function check_keywords() {
}

sql=$(cat << EOF
SET enable_profile=true;
SET enable_async_profile=false;
INSERT INTO t0 SELECT * FROM t0;
SELECT get_query_profile(last_query_id());
SELECT get_query_profile(PROFILE_ID) AS result FROM information_schema.loads WHERE LABEL = '${label}';
EOF
)

Expand Down
Loading