Skip to content

Commit

Permalink
Fix zmem for aggregation mainly and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Berg <[email protected]>
  • Loading branch information
seberg committed Jan 9, 2025
1 parent f7b4721 commit c557e14
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 8 deletions.
34 changes: 33 additions & 1 deletion cpp/src/core/library.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,41 @@ class Mapper : public legate::mapping::Mapper {
std::optional<std::size_t> allocation_pool_size(const legate::mapping::Task& task,
legate::mapping::StoreTarget memory_kind)
{
const auto task_id = static_cast<int>(task.task_id());

if (memory_kind == legate::mapping::StoreTarget::ZCMEM) {
switch (task_id) {
case legate::dataframe::task::OpCode::Join:
// TODO: Join is identical to GroupBy, but we would have to look at
// both input tables to ge the maximum column number.
return std::nullopt;
case legate::dataframe::task::OpCode::GroupByAggregation: {
// Join and Aggregation use repartitioning and use ZCMEM for NCCL.
// This depends on the number of columns (first argument to task
// is a vector of these columns).
// Join repartitions the table one after the other so one is enough.
if (task.is_single_task()) {
// No need for repartitioning, so no need for ZCMEM
return 0;
}
auto num_cols = task.scalars().at(0).value<int32_t>();
auto nrank = task.get_launch_domain().get_volume();

// Space for the exchange buffers containing size chunks:
size_t size_exchange_nbytes = nrank * nrank * 2 * sizeof(std::size_t);
// Space for column packing metadata exchange
size_t sizeof_serialized_column = 6 * 8; // not public, rough upper bound
// num_cols * 2 for the string column child and + 1 for the number of cols
size_t metadata_nbytes = nrank * (num_cols * 2 + 1) * sizeof_serialized_column;

return size_exchange_nbytes + metadata_nbytes;
}
default: return 0;
}
}

// TODO: Returning nullopt prevents other parallel task launches so it would be
// good to provide estimated usage for most tasks here.
if (memory_kind == legate::mapping::StoreTarget::ZCMEM) { return 0; }
return std::nullopt;
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/core/repartition_by_hash.cu
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class ExchangedSizes {
: _ctx(ctx), stream(ctx.tmp_stream())
{
assert(columns.size() == ctx.nranks - 1);
// Note: Size of this buffer is taken into account in the mapper:
_all_sizes =
legate::create_buffer<std::size_t>(ctx.nranks * ctx.nranks * 2, Memory::Kind::Z_COPY_MEM);

Expand Down Expand Up @@ -151,6 +152,7 @@ std::pair<std::vector<cudf::table_view>, std::map<int, rmm::device_buffer>> shuf
std::size_t nbytes = sizes.metadata(peer, ctx.rank);
if (nbytes > 0) {
assert(peer != ctx.rank);
// Note: Size of this buffer is taken into account in the mapper:
recv_metadata.insert(
{peer, legate::create_buffer<uint8_t>(nbytes, Memory::Kind::Z_COPY_MEM)});
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/tests/test_cudf_interop.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,8 @@ namespace {
static const char* library_name = "test.cudf_interop";

struct RoundTripTableTask : public legate::LegateTask<RoundTripTableTask> {
static constexpr auto TASK_ID = legate::LocalTaskID{0};
static constexpr auto TASK_ID = legate::LocalTaskID{0};
static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{}.with_has_allocations(true);

static void gpu_variant(legate::TaskContext context)
{
Expand Down
5 changes: 3 additions & 2 deletions cpp/tests/test_repartition_by_hash.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,8 @@ namespace {
static const char* library_name = "test.repartition_by_hash";

struct CheckHash : public legate::LegateTask<CheckHash> {
static constexpr auto TASK_ID = legate::LocalTaskID{0};
static constexpr auto TASK_ID = legate::LocalTaskID{0};
static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{}.with_has_allocations(true);

static void gpu_variant(legate::TaskContext context)
{
Expand Down
8 changes: 5 additions & 3 deletions cpp/tests/test_task.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,8 @@ namespace {
static const char* library_name = "test.global_row_offset";

struct GlobalRowOffsetTask : public legate::LegateTask<GlobalRowOffsetTask> {
static constexpr auto TASK_ID = legate::LocalTaskID{0};
static constexpr auto TASK_ID = legate::LocalTaskID{0};
static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{}.with_has_allocations(true);

static void gpu_variant(legate::TaskContext context)
{
Expand All @@ -54,7 +55,8 @@ struct GlobalRowOffsetTask : public legate::LegateTask<GlobalRowOffsetTask> {
};

struct TaskArgumentMix : public legate::LegateTask<TaskArgumentMix> {
static constexpr auto TASK_ID = legate::LocalTaskID{1};
static constexpr auto TASK_ID = legate::LocalTaskID{1};
static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{}.with_has_allocations(true);

static void gpu_variant(legate::TaskContext context)
{
Expand Down

0 comments on commit c557e14

Please sign in to comment.