Skip to content

Commit

Permalink
Make Ballista compatible with Datafusion 37.0.0 (from 36.0.0) (#1031)
Browse files Browse the repository at this point in the history
* Upgrading Ballista to datafusion 37.0.0.

* Better test debugging information in planner.rs

* Updated test logic in planner.

Since datafusion's apache/datafusion#9236,
HashJoinExec can also project.

* cargo fmt

* cargo fix

* Removed leftover comment

* Make cargo clippy happy

* lint

* Cargo fmt

* Fix tpch build

* Fix comment spelling

* cargo fmt
  • Loading branch information
RaphaelMarinier authored Jun 27, 2024
1 parent 3b6964b commit d1aff3e
Show file tree
Hide file tree
Showing 21 changed files with 243 additions and 178 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ exclude = [ "python" ]
resolver = "2"

[workspace.dependencies]
arrow = { version = "50.0.0", features=["ipc_compression"] }
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "50.0.0", default-features = false }
arrow = { version = "51.0.0", features=["ipc_compression"] }
arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "51.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "36.0.0"
datafusion-cli = "36.0.0"
datafusion-proto = "36.0.0"
datafusion = "37.0.0"
datafusion-cli = "37.0.0"
datafusion-proto = "37.0.0"
object_store = "0.9.0"
sqlparser = "0.43.0"
tonic = { version = "0.10" }
tonic-build = { version = "0.10", default-features = false, features = [
sqlparser = "0.44.0"
tonic = { version = "0.11" }
tonic-build = { version = "0.11", default-features = false, features = [
"transport",
"prost"
] }
Expand Down
5 changes: 2 additions & 3 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,9 @@ impl BallistaContext {
#[cfg(feature = "standalone")]
mod standalone_tests {
use ballista_core::error::Result;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::parquet::file::properties::WriterProperties;
use tempfile::TempDir;

#[tokio::test]
Expand All @@ -507,7 +507,7 @@ mod standalone_tests {
df.write_parquet(
&file_path,
DataFrameWriteOptions::default(),
Some(WriterProperties::default()),
Some(TableParquetOptions::default()),
)
.await?;
Ok(())
Expand Down Expand Up @@ -662,7 +662,6 @@ mod standalone_tests {
collect_stat: x.collect_stat,
target_partitions: x.target_partitions,
file_sort_order: vec![],
file_type_write_options: None,
};

let table_paths = listing_table
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::{
use crate::error::{BallistaError, Result};
use crate::serde::scheduler::{Action, PartitionId};

use arrow_flight;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
Expand Down Expand Up @@ -137,7 +138,6 @@ impl BallistaClient {
ticket: buf.clone().into(),
});
let result = self.flight_client.do_get(request).await;

let res = match result {
Ok(res) => res,
Err(ref err) => {
Expand All @@ -156,11 +156,11 @@ impl BallistaClient {
};

let mut stream = res.into_inner();

match stream.message().await {
Ok(res) => {
return match res {
Some(flight_data) => {
// convert FlightData to a stream
let schema = Arc::new(Schema::try_from(&flight_data)?);

// all the remaining stream messages should be dictionary and record batches
Expand Down
32 changes: 23 additions & 9 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};
use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
Expand Down Expand Up @@ -65,6 +65,7 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
plan_repr: PhantomData<T>,
/// Session id
session_id: String,
properties: PlanProperties,
}

impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
Expand All @@ -74,13 +75,15 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
plan: LogicalPlan,
session_id: String,
) -> Self {
let properties = Self::compute_properties(plan.schema().as_ref().clone().into());
Self {
scheduler_url,
config,
plan,
extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
plan_repr: PhantomData,
session_id,
properties,
}
}

Expand All @@ -91,13 +94,15 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
extension_codec: Arc<dyn LogicalExtensionCodec>,
session_id: String,
) -> Self {
let properties = Self::compute_properties(plan.schema().as_ref().clone().into());
Self {
scheduler_url,
config,
plan,
extension_codec,
plan_repr: PhantomData,
session_id,
properties,
}
}

Expand All @@ -109,15 +114,25 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
plan_repr: PhantomData<T>,
session_id: String,
) -> Self {
let properties = Self::compute_properties(plan.schema().as_ref().clone().into());
Self {
scheduler_url,
config,
plan,
extension_codec,
plan_repr,
session_id,
properties,
}
}

fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}

impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
Expand Down Expand Up @@ -147,12 +162,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
self.plan.schema().as_ref().clone().into()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand All @@ -170,6 +181,9 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
extension_codec: self.extension_codec.clone(),
plan_repr: self.plan_repr,
session_id: self.session_id.clone(),
properties: Self::compute_properties(
self.plan.schema().as_ref().clone().into(),
),
}))
}

Expand Down
50 changes: 24 additions & 26 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@ use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::runtime::SpawnedTask;

use datafusion::error::Result;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use futures::{Stream, StreamExt, TryStreamExt};

use crate::error::BallistaError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::common::AbortOnDropMany;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use itertools::Itertools;
use log::{error, info};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::sync::{mpsc, Semaphore};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
Expand All @@ -67,6 +65,7 @@ pub struct ShuffleReaderExec {
pub partition: Vec<Vec<PartitionLocation>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
properties: PlanProperties,
}

impl ShuffleReaderExec {
Expand All @@ -76,11 +75,19 @@ impl ShuffleReaderExec {
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
let properties = PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(schema.clone()),
// TODO partitioning may be known and could be populated here
// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(partition.len()),
datafusion::physical_plan::ExecutionMode::Bounded,
);
Ok(Self {
stage_id,
schema,
partition,
metrics: ExecutionPlanMetricsSet::new(),
properties,
})
}
}
Expand Down Expand Up @@ -108,16 +115,9 @@ impl ExecutionPlan for ShuffleReaderExec {
self.schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
// TODO partitioning may be known and could be populated here
// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(self.partition.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down Expand Up @@ -244,7 +244,7 @@ struct AbortableReceiverStream {
inner: ReceiverStream<result::Result<SendableRecordBatchStream, BallistaError>>,

#[allow(dead_code)]
drop_helper: AbortOnDropMany<()>,
drop_helper: Vec<SpawnedTask<()>>,
}

impl AbortableReceiverStream {
Expand All @@ -253,12 +253,12 @@ impl AbortableReceiverStream {
rx: tokio::sync::mpsc::Receiver<
result::Result<SendableRecordBatchStream, BallistaError>,
>,
join_handles: Vec<JoinHandle<()>>,
spawned_tasks: Vec<SpawnedTask<()>>,
) -> AbortableReceiverStream {
let inner = ReceiverStream::new(rx);
Self {
inner,
drop_helper: AbortOnDropMany(join_handles),
drop_helper: spawned_tasks,
}
}
}
Expand All @@ -282,7 +282,7 @@ fn send_fetch_partitions(
) -> AbortableReceiverStream {
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
let semaphore = Arc::new(Semaphore::new(max_request_num));
let mut join_handles = vec![];
let mut spawned_tasks: Vec<SpawnedTask<()>> = vec![];
let (local_locations, remote_locations): (Vec<_>, Vec<_>) = partition_locations
.into_iter()
.partition(check_is_local_location);
Expand All @@ -295,34 +295,32 @@ fn send_fetch_partitions(

// keep local shuffle files reading in serial order for memory control.
let response_sender_c = response_sender.clone();
let join_handle = tokio::spawn(async move {
spawned_tasks.push(SpawnedTask::spawn(async move {
for p in local_locations {
let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
if let Err(e) = response_sender_c.send(r).await {
error!("Fail to send response event to the channel due to {}", e);
}
}
});
join_handles.push(join_handle);
}));

for p in remote_locations.into_iter() {
let semaphore = semaphore.clone();
let response_sender = response_sender.clone();
let join_handle = tokio::spawn(async move {
// Block if exceeds max request number
spawned_tasks.push(SpawnedTask::spawn(async move {
// Block if exceeds max request number.
let permit = semaphore.acquire_owned().await.unwrap();
let r = PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
// Block if the channel buffer is ful
// Block if the channel buffer is full.
if let Err(e) = response_sender.send(r).await {
error!("Fail to send response event to the channel due to {}", e);
}
// Increase semaphore by dropping existing permits.
drop(permit);
});
join_handles.push(join_handle);
}));
}

AbortableReceiverStream::create(response_receiver, join_handles)
AbortableReceiverStream::create(response_receiver, spawned_tasks)
}

fn check_is_local_location(location: &PartitionLocation) -> bool {
Expand Down
Loading

0 comments on commit d1aff3e

Please sign in to comment.