Skip to content

Commit

Permalink
Upgrade to Datafusion 41 (#1062)
Browse files Browse the repository at this point in the history
* upgrade dependencies, some fixes, still wip

compiles

add license

Update datafusion protobuf definitions (#1057)

* update datafusion proto defs

* allow optionals in proto3

update docker environment for higher protoc version

* runs

* test e2e, fix python

* rm unnecessary dependency

* create BallistaLogicalExtensionCodec that can decode/encode file formats, fix some tests

* fix tests

* clippy, tomlfmt

* fix grpc connect info extract

* extract into method, remove unnecessary log

* datafusion to 41, adjust other deps
  • Loading branch information
palaska authored Sep 27, 2024
1 parent 51adfb0 commit 2f223db
Show file tree
Hide file tree
Showing 35 changed files with 653 additions and 543 deletions.
26 changes: 15 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@ members = ["ballista-cli", "ballista/cache", "ballista/client", "ballista/core",
resolver = "2"

[workspace.dependencies]
arrow = { version = "52.0.0", features = ["ipc_compression"] }
arrow-flight = { version = "52.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "52.0.0", default-features = false }
arrow = { version = "52.2.0", features = ["ipc_compression"] }
arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "52.2.0", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "39.0.0"
datafusion-cli = "39.0.0"
datafusion-proto = "39.0.0"
datafusion-proto-common = "39.0.0"
object_store = "0.10.1"
sqlparser = "0.47.0"
tonic = { version = "0.11" }
tonic-build = { version = "0.11", default-features = false, features = [
# bump directly to datafusion v43 to avoid the serde bug on v42 (https://github.com/apache/datafusion/pull/12626)
datafusion = "41.0.0"
datafusion-cli = "41.0.0"
datafusion-proto = "41.0.0"
datafusion-proto-common = "41.0.0"
object_store = "0.10.2"
prost = "0.12.0"
prost-types = "0.12.0"
sqlparser = "0.49.0"
tonic = { version = "0.11.0" }
tonic-build = { version = "0.11.0", default-features = false, features = [
"transport",
"prost"
] }
Expand Down
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ readme = "README.md"

[dependencies]
ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] }
clap = { version = "3", features = ["derive", "cargo"] }
clap = { workspace = true }
datafusion = { workspace = true }
datafusion-cli = { workspace = true }
dirs = "5.0.1"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
num_cpus = "1.13.0"
rustyline = "11.0"
rustyline = "11.0.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }

[features]
Expand Down
28 changes: 14 additions & 14 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ struct Args {
short = 'p',
long,
help = "Path to your data, default to current directory",
validator(is_valid_data_dir)
value_parser(parse_valid_data_dir)
)]
data_path: Option<String>,

#[clap(
short = 'c',
long,
help = "The batch size of each query, or use Ballista default",
validator(is_valid_batch_size)
value_parser(parse_batch_size)
)]
batch_size: Option<usize>,

#[clap(
long,
help = "The max concurrent tasks, only for Ballista local mode. Default: all available cores",
validator(is_valid_concurrent_tasks_size)
value_parser(parse_valid_concurrent_tasks_size)
)]
concurrent_tasks: Option<usize>,

Expand All @@ -60,7 +60,7 @@ struct Args {
long,
multiple_values = true,
help = "Execute commands from file(s), then exit",
validator(is_valid_file)
value_parser(parse_valid_file)
)]
file: Vec<String>,

Expand All @@ -69,12 +69,12 @@ struct Args {
long,
multiple_values = true,
help = "Run the provided files on startup instead of ~/.ballistarc",
validator(is_valid_file),
value_parser(parse_valid_file),
conflicts_with = "file"
)]
rc: Option<Vec<String>>,

#[clap(long, arg_enum, default_value_t = PrintFormat::Table)]
#[clap(long, value_enum, default_value_t = PrintFormat::Table)]
format: PrintFormat,

#[clap(long, help = "Ballista scheduler host")]
Expand Down Expand Up @@ -168,32 +168,32 @@ pub async fn main() -> Result<()> {
Ok(())
}

fn is_valid_file(dir: &str) -> std::result::Result<(), String> {
fn parse_valid_file(dir: &str) -> std::result::Result<String, String> {
if Path::new(dir).is_file() {
Ok(())
Ok(dir.to_string())
} else {
Err(format!("Invalid file '{dir}'"))
}
}

fn is_valid_data_dir(dir: &str) -> std::result::Result<(), String> {
fn parse_valid_data_dir(dir: &str) -> std::result::Result<String, String> {
if Path::new(dir).is_dir() {
Ok(())
Ok(dir.to_string())
} else {
Err(format!("Invalid data directory '{dir}'"))
}
}

fn is_valid_batch_size(size: &str) -> std::result::Result<(), String> {
fn parse_batch_size(size: &str) -> std::result::Result<usize, String> {
match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(()),
Ok(size) if size > 0 => Ok(size),
_ => Err(format!("Invalid batch size '{size}'")),
}
}

fn is_valid_concurrent_tasks_size(size: &str) -> std::result::Result<(), String> {
fn parse_valid_concurrent_tasks_size(size: &str) -> std::result::Result<usize, String> {
match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(()),
Ok(size) if size > 0 => Ok(size),
_ => Err(format!("Invalid concurrent_tasks size '{size}'")),
}
}
3 changes: 2 additions & 1 deletion ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ data set. Download the file and add it to the `testdata` folder before running t

```rust,no_run
use ballista::prelude::*;
use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions};
use datafusion::prelude::{col, ParquetReadOptions};
use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, average::avg};
#[tokio::main]
async fn main() -> Result<()> {
Expand Down
32 changes: 17 additions & 15 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::context::DataFilePaths;
use datafusion::sql::TableReference;
use log::info;
use parking_lot::Mutex;
use sqlparser::ast::Statement;
Expand All @@ -33,7 +34,6 @@ use ballista_core::utils::{
};
use datafusion_proto::protobuf::LogicalPlanNode;

use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::{source_as_provider, TableProvider};
use datafusion::error::{DataFusionError, Result};
Expand Down Expand Up @@ -791,7 +791,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------+",
"| MIN(test.id) |",
"| min(test.id) |",
"+--------------+",
"| 0 |",
"+--------------+",
Expand All @@ -802,7 +802,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------+",
"| MAX(test.id) |",
"| max(test.id) |",
"+--------------+",
"| 7 |",
"+--------------+",
Expand All @@ -818,7 +818,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------+",
"| SUM(test.id) |",
"| sum(test.id) |",
"+--------------+",
"| 28 |",
"+--------------+",
Expand All @@ -833,7 +833,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------+",
"| AVG(test.id) |",
"| avg(test.id) |",
"+--------------+",
"| 3.5 |",
"+--------------+",
Expand All @@ -849,7 +849,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+----------------+",
"| COUNT(test.id) |",
"| count(test.id) |",
"+----------------+",
"| 8 |",
"+----------------+",
Expand All @@ -867,7 +867,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------------+",
"| APPROX_DISTINCT(test.id) |",
"| approx_distinct(test.id) |",
"+--------------------------+",
"| 8 |",
"+--------------------------+",
Expand All @@ -885,7 +885,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------------+",
"| ARRAY_AGG(test.id) |",
"| array_agg(test.id) |",
"+--------------------------+",
"| [4, 5, 6, 7, 2, 3, 0, 1] |",
"+--------------------------+",
Expand Down Expand Up @@ -914,7 +914,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
"| VAR_POP(test.id) |",
"| var_pop(test.id) |",
"+-------------------+",
"| 5.250000000000001 |",
"+-------------------+",
Expand Down Expand Up @@ -946,7 +946,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------+",
"| STDDEV(test.id) |",
"| stddev(test.id) |",
"+--------------------+",
"| 2.4494897427831783 |",
"+--------------------+",
Expand All @@ -960,7 +960,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------+",
"| STDDEV(test.id) |",
"| stddev(test.id) |",
"+--------------------+",
"| 2.4494897427831783 |",
"+--------------------+",
Expand Down Expand Up @@ -996,25 +996,27 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------------------+",
"| CORR(test.id,test.tinyint_col) |",
"| corr(test.id,test.tinyint_col) |",
"+--------------------------------+",
"| 0.21821789023599245 |",
"+--------------------------------+",
];
assert_result_eq(expected, &res);
}
// enable when upgrading Datafusion to > 42
#[ignore]
#[tokio::test]
async fn test_aggregate_approx_percentile() {
let context = create_test_context().await;

let df = context
.sql("select approx_percentile_cont_with_weight(\"id\", 2, 0.5) from test")
.sql("select approx_percentile_cont_with_weight(id, 2, 0.5) from test")
.await
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------------------------------------------------------+",
"| APPROX_PERCENTILE_CONT_WITH_WEIGHT(test.id,Int64(2),Float64(0.5)) |",
"| approx_percentile_cont_with_weight(test.id,Int64(2),Float64(0.5)) |",
"+-------------------------------------------------------------------+",
"| 1 |",
"+-------------------------------------------------------------------+",
Expand All @@ -1028,7 +1030,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+------------------------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.double_col,Float64(0.5)) |",
"| approx_percentile_cont(test.double_col,Float64(0.5)) |",
"+------------------------------------------------------+",
"| 7.574999999999999 |",
"+------------------------------------------------------+",
Expand Down
6 changes: 3 additions & 3 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async-trait = "0.1.41"
ballista-cache = { path = "../cache", version = "0.12.0" }
bytes = "1.0"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
clap = { workspace = true }
datafusion = { workspace = true }
datafusion-objectstore-hdfs = { version = "0.1.4", default-features = false, optional = true }
datafusion-proto = { workspace = true }
Expand All @@ -68,8 +68,8 @@ once_cell = "1.9.0"

parking_lot = "0.12"
parse_arg = "0.1.3"
prost = "0.12"
prost-types = "0.12"
prost = { workspace = true }
prost-types = { workspace = true }
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sqlparser = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Ballista configuration
use clap::ArgEnum;
use clap::ValueEnum;
use core::fmt;
use std::collections::HashMap;
use std::result;
Expand Down Expand Up @@ -307,7 +307,7 @@ impl BallistaConfig {

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
pub enum TaskSchedulingPolicy {
PullStaged,
PushStaged,
Expand All @@ -317,7 +317,7 @@ impl std::str::FromStr for TaskSchedulingPolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
ValueEnum::from_str(s, true)
}
}

Expand All @@ -329,7 +329,7 @@ impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {

// an enum used to configure the log rolling policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
pub enum LogRotationPolicy {
Minutely,
Hourly,
Expand All @@ -341,7 +341,7 @@ impl std::str::FromStr for LogRotationPolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
ValueEnum::from_str(s, true)
}
}

Expand All @@ -353,7 +353,7 @@ impl parse_arg::ParseArgFromStr for LogRotationPolicy {

// an enum used to configure the source data cache policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
pub enum DataCachePolicy {
LocalDiskFile,
}
Expand All @@ -362,7 +362,7 @@ impl std::str::FromStr for DataCachePolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
ValueEnum::from_str(s, true)
}
}

Expand Down
4 changes: 4 additions & 0 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
}

impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
fn name(&self) -> &str {
"DistributedQueryExec"
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
Loading

0 comments on commit 2f223db

Please sign in to comment.