diff --git a/Cargo.toml b/Cargo.toml index e77e54684..007482bf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,23 +21,23 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo resolver = "2" [workspace.dependencies] -arrow = { version = "52.2.0", features = ["ipc_compression"] } -arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] } -arrow-schema = { version = "52.2.0", default-features = false } +arrow = { version = "53", features = ["ipc_compression"] } +arrow-flight = { version = "53", features = ["flight-sql-experimental"] } +arrow-schema = { version = "53", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } # bump directly to datafusion v43 to avoid the serde bug on v42 (https://github.com/apache/datafusion/pull/12626) -datafusion = "41.0.0" -datafusion-cli = "41.0.0" -datafusion-proto = "41.0.0" -datafusion-proto-common = "41.0.0" -object_store = "0.10.2" -prost = "0.12.0" -prost-types = "0.12.0" -sqlparser = "0.49.0" -tonic = { version = "0.11.0" } -tonic-build = { version = "0.11.0", default-features = false, features = [ +datafusion = "42.0.0" +datafusion-cli = "42.0.0" +datafusion-proto = "42.0.0" +datafusion-proto-common = "42.0.0" +object_store = "0.11" +prost = "0.13" +prost-types = "0.13" +sqlparser = "0.50" +tonic = { version = "0.12" } +tonic-build = { version = "0.12", default-features = false, features = [ "transport", "prost" ] } diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index 16ff0b54c..891f5a7ce 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -30,14 +30,15 @@ readme = "README.md" [dependencies] ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] } -clap = { workspace = true } +# datafusion-cli uses 4.5 clap, thus it does not depend on workspace +clap = { version = "4.5", features = ["derive", "cargo"] } datafusion = { workspace = true } datafusion-cli = { workspace = true } dirs = "5.0.1" env_logger = { workspace = true } mimalloc = { version = "0.1", default-features = false } num_cpus = { workspace = true } -rustyline = "11.0.0" +rustyline = "14.0.0" tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } [features] diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs index 0c407cf0f..2123713ad 100644 --- a/ballista-cli/src/command.rs +++ b/ballista-cli/src/command.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use std::time::Instant; use ballista::prelude::{BallistaContext, BallistaError, Result}; -use clap::ArgEnum; + use datafusion::arrow::array::{ArrayRef, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; @@ -223,7 +223,7 @@ impl OutputFormat { Err(BallistaError::General(format!( "{:?} is not a valid format type [possible values: {:?}]", format, - PrintFormat::value_variants() + "TO BE FIXED", //PrintFormat::value_variants() ))) } } diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs index 6aeecd6c9..0fd6ddfdb 100644 --- a/ballista-cli/src/main.rs +++ b/ballista-cli/src/main.rs @@ -58,7 +58,7 @@ struct Args { #[clap( short, long, - multiple_values = true, + num_args = 0.., help = "Execute commands from file(s), then exit", value_parser(parse_valid_file) )] @@ -67,7 +67,7 @@ struct Args { #[clap( short = 'r', long, - multiple_values = true, + num_args = 0.., help = "Run the provided files on startup instead of ~/.ballistarc", value_parser(parse_valid_file), conflicts_with = "file" diff --git a/ballista/core/build.rs b/ballista/core/build.rs index 6e501b88f..4fe7e3bf6 100644 --- a/ballista/core/build.rs +++ b/ballista/core/build.rs @@ -44,7 +44,7 @@ fn main() -> Result<(), String> { .extern_path(".datafusion_common", "::datafusion_proto_common") .extern_path(".datafusion", "::datafusion_proto::protobuf") .protoc_arg("--experimental_allow_proto3_optional") - .compile(&["proto/ballista.proto"], &["proto"]) + .compile_protos(&["proto/ballista.proto"], &["proto"]) .map_err(|e| format!("protobuf compilation failed: {e}"))?; let generated_source_path = out.join("ballista.protobuf.rs"); let code = std::fs::read_to_string(generated_source_path).unwrap(); diff --git a/ballista/core/proto/datafusion.proto b/ballista/core/proto/datafusion.proto index 8402b92fb..cf166ba93 100644 --- a/ballista/core/proto/datafusion.proto +++ b/ballista/core/proto/datafusion.proto @@ -75,6 +75,10 @@ message LogicalExprNodeCollection { repeated LogicalExprNode logical_expr_nodes = 1; } +message SortExprNodeCollection { + repeated SortExprNode sort_expr_nodes = 1; +} + message ListingTableScanNode { reserved 1; // was string table_name TableReference table_name = 14; @@ -90,8 +94,9 @@ message ListingTableScanNode { datafusion_common.CsvFormat csv = 10; datafusion_common.ParquetFormat parquet = 11; datafusion_common.AvroFormat avro = 12; + datafusion_common.NdJsonFormat json = 15; } - repeated LogicalExprNodeCollection file_sort_order = 13; + repeated SortExprNodeCollection file_sort_order = 13; } message ViewTableScanNode { @@ -128,7 +133,7 @@ message SelectionNode { message SortNode { LogicalPlanNode input = 1; - repeated LogicalExprNode expr = 2; + repeated SortExprNode expr = 2; // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; } @@ -159,12 +164,12 @@ message CreateExternalTableNode { repeated string table_partition_cols = 5; bool if_not_exists = 6; string definition = 7; - repeated LogicalExprNodeCollection order_exprs = 10; + repeated SortExprNodeCollection order_exprs = 10; bool unbounded = 11; map options = 8; datafusion_common.Constraints constraints = 12; map column_defaults = 13; - } +} message PrepareNode { string name = 1; @@ -244,35 +249,51 @@ message DistinctNode { message DistinctOnNode { repeated LogicalExprNode on_expr = 1; repeated LogicalExprNode select_expr = 2; - repeated LogicalExprNode sort_expr = 3; + repeated SortExprNode sort_expr = 3; LogicalPlanNode input = 4; } message CopyToNode { - LogicalPlanNode input = 1; - string output_url = 2; - oneof format_options { - datafusion_common.CsvOptions csv = 8; - datafusion_common.JsonOptions json = 9; - datafusion_common.TableParquetOptions parquet = 10; - datafusion_common.AvroOptions avro = 11; - datafusion_common.ArrowOptions arrow = 12; - } - repeated string partition_by = 7; + LogicalPlanNode input = 1; + string output_url = 2; + bytes file_type = 3; + repeated string partition_by = 7; } message UnnestNode { - LogicalPlanNode input = 1; - repeated datafusion_common.Column exec_columns = 2; - repeated uint64 list_type_columns = 3; - repeated uint64 struct_type_columns = 4; - repeated uint64 dependency_indices = 5; - datafusion_common.DfSchema schema = 6; - UnnestOptions options = 7; + LogicalPlanNode input = 1; + repeated ColumnUnnestExec exec_columns = 2; + repeated ColumnUnnestListItem list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + repeated uint64 dependency_indices = 5; + datafusion_common.DfSchema schema = 6; + UnnestOptions options = 7; +} +message ColumnUnnestListItem { + uint32 input_index = 1; + ColumnUnnestListRecursion recursion = 2; +} + +message ColumnUnnestListRecursions { + repeated ColumnUnnestListRecursion recursions = 2; +} + +message ColumnUnnestListRecursion { + datafusion_common.Column output_column = 1; + uint32 depth = 2; +} + +message ColumnUnnestExec { + datafusion_common.Column column = 1; + oneof UnnestType { + ColumnUnnestListRecursions list = 2; + datafusion_common.EmptyMessage struct = 3; + datafusion_common.EmptyMessage inferred = 4; + } } message UnnestOptions { - bool preserve_nulls = 1; + bool preserve_nulls = 1; } message UnionNode { @@ -316,8 +337,6 @@ message LogicalExprNode { // binary expressions BinaryExprNode binary_expr = 4; - // aggregate expressions - AggregateExprNode aggregate_expr = 5; // null checks IsNull is_null_expr = 6; @@ -327,7 +346,6 @@ message LogicalExprNode { BetweenNode between = 9; CaseNode case_ = 10; CastNode cast = 11; - SortExprNode sort = 12; NegativeNode negative = 13; InListNode in_list = 14; Wildcard wildcard = 15; @@ -369,7 +387,7 @@ message LogicalExprNode { } message Wildcard { - string qualifier = 1; + TableReference qualifier = 1; } message PlaceholderNode { @@ -471,57 +489,14 @@ message InListNode { bool negated = 3; } -enum AggregateFunction { - MIN = 0; - MAX = 1; - SUM = 2; - AVG = 3; - COUNT = 4; - APPROX_DISTINCT = 5; - ARRAY_AGG = 6; - // VARIANCE = 7; - VARIANCE_POP = 8; - // COVARIANCE = 9; - // COVARIANCE_POP = 10; - STDDEV = 11; - STDDEV_POP = 12; - CORRELATION = 13; - APPROX_PERCENTILE_CONT = 14; - APPROX_MEDIAN = 15; - APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; - GROUPING = 17; - // MEDIAN = 18; - BIT_AND = 19; - BIT_OR = 20; - BIT_XOR = 21; - BOOL_AND = 22; - BOOL_OR = 23; - REGR_SLOPE = 26; - REGR_INTERCEPT = 27; - REGR_COUNT = 28; - REGR_R2 = 29; - REGR_AVGX = 30; - REGR_AVGY = 31; - REGR_SXX = 32; - REGR_SYY = 33; - REGR_SXY = 34; - STRING_AGG = 35; - NTH_VALUE_AGG = 36; -} - -message AggregateExprNode { - AggregateFunction aggr_function = 1; - repeated LogicalExprNode expr = 2; - bool distinct = 3; - LogicalExprNode filter = 4; - repeated LogicalExprNode order_by = 5; -} message AggregateUDFExprNode { string fun_name = 1; repeated LogicalExprNode args = 2; + bool distinct = 5; LogicalExprNode filter = 3; - repeated LogicalExprNode order_by = 4; + repeated SortExprNode order_by = 4; + optional bytes fun_definition = 6; } message ScalarUDFExprNode { @@ -531,7 +506,8 @@ message ScalarUDFExprNode { } enum BuiltInWindowFunction { - ROW_NUMBER = 0; + UNSPECIFIED = 0; // https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum + // ROW_NUMBER = 0; RANK = 1; DENSE_RANK = 2; PERCENT_RANK = 3; @@ -546,16 +522,16 @@ enum BuiltInWindowFunction { message WindowExprNode { oneof window_function { - AggregateFunction aggr_function = 1; BuiltInWindowFunction built_in_function = 2; string udaf = 3; string udwf = 9; } LogicalExprNode expr = 4; repeated LogicalExprNode partition_by = 5; - repeated LogicalExprNode order_by = 6; + repeated SortExprNode order_by = 6; // repeated LogicalExprNode filter = 7; WindowFrame window_frame = 8; + optional bytes fun_definition = 10; } message BetweenNode { @@ -674,9 +650,11 @@ message PlanType { datafusion_common.EmptyMessage FinalLogicalPlan = 3; datafusion_common.EmptyMessage InitialPhysicalPlan = 4; datafusion_common.EmptyMessage InitialPhysicalPlanWithStats = 9; + datafusion_common.EmptyMessage InitialPhysicalPlanWithSchema = 11; OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5; datafusion_common.EmptyMessage FinalPhysicalPlan = 6; datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10; + datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12; } } @@ -737,10 +715,11 @@ message PhysicalPlanNode { AnalyzeExecNode analyze = 23; JsonSinkExecNode json_sink = 24; SymmetricHashJoinExecNode symmetric_hash_join = 25; - InterleaveExecNode interleave = 26; + InterleaveExecNode interleave = 26; PlaceholderRowExecNode placeholder_row = 27; CsvSinkExecNode csv_sink = 28; ParquetSinkExecNode parquet_sink = 29; + UnnestExecNode unnest = 30; } } @@ -752,13 +731,21 @@ message PartitionColumn { message FileSinkConfig { reserved 6; // writer_mode + reserved 8; // was `overwrite` which has been superseded by `insert_op` string object_store_url = 1; repeated PartitionedFile file_groups = 2; repeated string table_paths = 3; datafusion_common.Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; - bool overwrite = 8; + bool keep_partition_by_columns = 9; + InsertOp insert_op = 10; +} + +enum InsertOp { + Append = 0; + Overwrite = 1; + Replace = 2; } message JsonSink { @@ -797,6 +784,19 @@ message ParquetSinkExecNode { PhysicalSortExprNodeCollection sort_order = 4; } +message UnnestExecNode { + PhysicalPlanNode input = 1; + datafusion_common.Schema schema = 2; + repeated ListUnnest list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + UnnestOptions options = 5; +} + +message ListUnnest { + uint32 index_in_input_schema = 1; + uint32 depth = 2; +} + message PhysicalExtensionNode { bytes node = 1; repeated PhysicalPlanNode inputs = 2; @@ -838,6 +838,8 @@ message PhysicalExprNode { // was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17; PhysicalLikeExprNode like_expr = 18; + + PhysicalExtensionExprNode extension = 19; } } @@ -850,17 +852,17 @@ message PhysicalScalarUdfNode { message PhysicalAggregateExprNode { oneof AggregateFunction { - AggregateFunction aggr_function = 1; string user_defined_aggr_function = 4; } repeated PhysicalExprNode expr = 2; repeated PhysicalSortExprNode ordering_req = 5; bool distinct = 3; + bool ignore_nulls = 6; + optional bytes fun_definition = 7; } message PhysicalWindowExprNode { oneof window_function { - AggregateFunction aggr_function = 1; BuiltInWindowFunction built_in_function = 2; string user_defined_aggr_function = 3; } @@ -869,6 +871,7 @@ message PhysicalWindowExprNode { repeated PhysicalSortExprNode order_by = 6; WindowFrame window_frame = 7; string name = 8; + optional bytes fun_definition = 9; } message PhysicalIsNull { @@ -944,10 +947,16 @@ message PhysicalNegativeNode { PhysicalExprNode expr = 1; } +message PhysicalExtensionExprNode { + bytes expr = 1; + repeated PhysicalExprNode inputs = 2; +} + message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; uint32 default_filter_selectivity = 3; + repeated uint32 projection = 9; } message FileGroup { @@ -994,6 +1003,10 @@ message CsvScanExecNode { oneof optional_escape { string escape = 5; } + oneof optional_comment { + string comment = 6; + } + bool newlines_in_values = 7; } message AvroScanExecNode { @@ -1174,6 +1187,7 @@ message NestedLoopJoinExecNode { message CoalesceBatchesExecNode { PhysicalPlanNode input = 1; uint32 target_batch_size = 2; + optional uint32 fetch = 3; } message CoalescePartitionsExecNode { @@ -1233,4 +1247,4 @@ message PartitionStats { int64 num_batches = 2; int64 num_bytes = 3; repeated datafusion_common.ColumnStats column_stats = 4; -} +} \ No newline at end of file diff --git a/ballista/core/proto/datafusion_common.proto b/ballista/core/proto/datafusion_common.proto index d9ec7dbb5..c3906abf7 100644 --- a/ballista/core/proto/datafusion_common.proto +++ b/ballista/core/proto/datafusion_common.proto @@ -51,6 +51,11 @@ message ParquetFormat { message AvroFormat {} +message NdJsonFormat { + JsonOptions options = 1; +} + + message PrimaryKeyConstraint{ repeated uint64 indices = 1; } @@ -130,6 +135,12 @@ message Decimal{ int32 scale = 4; } +message Decimal256Type{ + reserved 1, 2; + uint32 precision = 3; + int32 scale = 4; +} + message List{ Field field_type = 1; } @@ -164,7 +175,7 @@ message Union{ repeated int32 type_ids = 3; } -// Used for List/FixedSizeList/LargeList/Struct +// Used for List/FixedSizeList/LargeList/Struct/Map message ScalarNestedValue { message Dictionary { bytes ipc_message = 1; @@ -248,6 +259,7 @@ message ScalarValue{ bool bool_value = 1; string utf8_value = 2; string large_utf8_value = 3; + string utf8_view_value = 23; int32 int8_value = 4; int32 int16_value = 5; int32 int32_value = 6; @@ -265,6 +277,7 @@ message ScalarValue{ ScalarNestedValue list_value = 17; ScalarNestedValue fixed_size_list_value = 18; ScalarNestedValue struct_value = 32; + ScalarNestedValue map_value = 41; Decimal128 decimal128_value = 20; Decimal256 decimal256_value = 39; @@ -281,6 +294,7 @@ message ScalarValue{ ScalarDictionaryValue dictionary_value = 27; bytes binary_value = 28; bytes large_binary_value = 29; + bytes binary_view_value = 22; ScalarTime64Value time64_value = 30; IntervalDayTimeValue interval_daytime_value = 25; IntervalMonthDayNanoValue interval_month_day_nano = 31; @@ -318,8 +332,10 @@ message ArrowType{ EmptyMessage FLOAT32 = 12 ; EmptyMessage FLOAT64 = 13 ; EmptyMessage UTF8 = 14 ; + EmptyMessage UTF8_VIEW = 35; EmptyMessage LARGE_UTF8 = 32; EmptyMessage BINARY = 15 ; + EmptyMessage BINARY_VIEW = 34; int32 FIXED_SIZE_BINARY = 16 ; EmptyMessage LARGE_BINARY = 31; EmptyMessage DATE32 = 17 ; @@ -330,6 +346,7 @@ message ArrowType{ TimeUnit TIME64 = 22 ; IntervalUnit INTERVAL = 23 ; Decimal DECIMAL = 24 ; + Decimal256Type DECIMAL256 = 36; List LIST = 25; List LARGE_LIST = 26; FixedSizeList FIXED_SIZE_LIST = 27; @@ -381,6 +398,12 @@ message CsvWriterOptions { string time_format = 7; // Optional value to represent null string null_value = 8; + // Optional quote. Defaults to `b'"'` + string quote = 9; + // Optional escape. Defaults to `'\\'` + string escape = 10; + // Optional flag whether to double quotes, instead of escaping. Defaults to `true` + bool double_quote = 11; } // Options controlling CSV format @@ -397,6 +420,10 @@ message CsvOptions { string timestamp_tz_format = 10; // Optional timestamp with timezone format string time_format = 11; // Optional time format string null_value = 12; // Optional representation of null value + bytes comment = 13; // Optional comment character as a byte + bytes double_quote = 14; // Indicates if quotes are doubled + bytes newlines_in_values = 15; // Indicates if newlines are supported in values + bytes terminator = 16; // Optional terminator character as a byte } // Options controlling CSV format @@ -407,15 +434,16 @@ message JsonOptions { message TableParquetOptions { ParquetOptions global = 1; - repeated ColumnSpecificOptions column_specific_options = 2; + repeated ParquetColumnSpecificOptions column_specific_options = 2; + map key_value_metadata = 3; } -message ColumnSpecificOptions { +message ParquetColumnSpecificOptions { string column_name = 1; - ColumnOptions options = 2; + ParquetColumnOptions options = 2; } -message ColumnOptions { +message ParquetColumnOptions { oneof bloom_filter_enabled_opt { bool bloom_filter_enabled = 1; } @@ -465,6 +493,7 @@ message ParquetOptions { uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2 bool bloom_filter_on_read = 26; // default = true bool bloom_filter_on_write = 27; // default = false + bool schema_force_view_types = 28; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; @@ -538,4 +567,4 @@ message ColumnStats { Precision max_value = 2; Precision null_count = 3; Precision distinct_count = 4; -} +} \ No newline at end of file diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 2f856b394..4a2c25b87 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -419,6 +419,7 @@ fn fetch_partition_local_inner( let file = File::open(path).map_err(|e| { BallistaError::General(format!("Failed to open partition file at {path}: {e:?}")) })?; + let file = BufReader::new(file); let reader = StreamReader::try_new(file, None).map_err(|e| { BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}")) })?; diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index c45bdeafc..51a7b80be 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -2,7 +2,6 @@ /// ///////////////////////////////////////////////////////////////////////////////////////////////// /// Ballista Physical Plan /// ///////////////////////////////////////////////////////////////////////////////////////////////// -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct BallistaPhysicalPlanNode { #[prost(oneof = "ballista_physical_plan_node::PhysicalPlanType", tags = "1, 2, 3")] @@ -12,7 +11,6 @@ pub struct BallistaPhysicalPlanNode { } /// Nested message and enum types in `BallistaPhysicalPlanNode`. pub mod ballista_physical_plan_node { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum PhysicalPlanType { #[prost(message, tag = "1")] @@ -23,7 +21,6 @@ pub mod ballista_physical_plan_node { UnresolvedShuffle(super::UnresolvedShuffleExecNode), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ShuffleWriterExecNode { /// TODO it seems redundant to provide job and stage id here since we also have them @@ -39,7 +36,6 @@ pub struct ShuffleWriterExecNode { ::datafusion_proto::protobuf::PhysicalHashRepartition, >, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct UnresolvedShuffleExecNode { #[prost(uint32, tag = "1")] @@ -49,7 +45,6 @@ pub struct UnresolvedShuffleExecNode { #[prost(uint32, tag = "4")] pub output_partition_count: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ShuffleReaderExecNode { #[prost(message, repeated, tag = "1")] @@ -60,7 +55,6 @@ pub struct ShuffleReaderExecNode { #[prost(uint32, tag = "3")] pub stage_id: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ShuffleReaderPartition { /// each partition of a shuffle read can read data from multiple locations @@ -70,7 +64,6 @@ pub struct ShuffleReaderPartition { /// ///////////////////////////////////////////////////////////////////////////////////////////////// /// Ballista Scheduling /// ///////////////////////////////////////////////////////////////////////////////////////////////// -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutionGraph { #[prost(string, tag = "1")] @@ -100,7 +93,6 @@ pub struct ExecutionGraph { #[prost(uint64, tag = "13")] pub queued_at: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct StageAttempts { #[prost(uint32, tag = "1")] @@ -108,7 +100,6 @@ pub struct StageAttempts { #[prost(uint32, repeated, tag = "2")] pub stage_attempt_num: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutionGraphStage { #[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")] @@ -116,7 +107,6 @@ pub struct ExecutionGraphStage { } /// Nested message and enum types in `ExecutionGraphStage`. pub mod execution_graph_stage { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum StageType { #[prost(message, tag = "1")] @@ -129,7 +119,6 @@ pub mod execution_graph_stage { FailedStage(super::FailedStage), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct UnResolvedStage { #[prost(uint32, tag = "1")] @@ -147,7 +136,6 @@ pub struct UnResolvedStage { ::prost::alloc::string::String, >, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ResolvedStage { #[prost(uint32, tag = "1")] @@ -167,7 +155,6 @@ pub struct ResolvedStage { ::prost::alloc::string::String, >, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SuccessfulStage { #[prost(uint32, tag = "1")] @@ -187,7 +174,6 @@ pub struct SuccessfulStage { #[prost(uint32, tag = "9")] pub stage_attempt_num: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FailedStage { #[prost(uint32, tag = "1")] @@ -207,7 +193,6 @@ pub struct FailedStage { #[prost(uint32, tag = "9")] pub stage_attempt_num: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct TaskInfo { #[prost(uint32, tag = "1")] @@ -234,7 +219,6 @@ pub struct TaskInfo { } /// Nested message and enum types in `TaskInfo`. pub mod task_info { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Status { #[prost(message, tag = "8")] @@ -245,7 +229,6 @@ pub mod task_info { Successful(super::SuccessfulTask), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GraphStageInput { #[prost(uint32, tag = "1")] @@ -255,7 +238,6 @@ pub struct GraphStageInput { #[prost(bool, tag = "3")] pub complete: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct TaskInputPartitions { #[prost(uint32, tag = "1")] @@ -263,7 +245,6 @@ pub struct TaskInputPartitions { #[prost(message, repeated, tag = "2")] pub partition_location: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct KeyValuePair { #[prost(string, tag = "1")] @@ -271,7 +252,6 @@ pub struct KeyValuePair { #[prost(string, tag = "2")] pub value: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Action { /// configuration settings @@ -282,7 +262,6 @@ pub struct Action { } /// Nested message and enum types in `Action`. pub mod action { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum ActionType { /// Fetch a partition from an executor @@ -290,7 +269,6 @@ pub mod action { FetchPartition(super::FetchPartition), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutePartition { #[prost(string, tag = "1")] @@ -310,7 +288,6 @@ pub struct ExecutePartition { ::datafusion_proto::protobuf::PhysicalHashRepartition, >, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FetchPartition { #[prost(string, tag = "1")] @@ -326,7 +303,6 @@ pub struct FetchPartition { #[prost(uint32, tag = "6")] pub port: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartitionLocation { /// partition_id of the map stage who produces the shuffle. @@ -343,7 +319,6 @@ pub struct PartitionLocation { pub path: ::prost::alloc::string::String, } /// Unique identifier for a materialized partition of data -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartitionId { #[prost(string, tag = "1")] @@ -353,8 +328,7 @@ pub struct PartitionId { #[prost(uint32, tag = "4")] pub partition_id: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct TaskId { #[prost(uint32, tag = "1")] pub task_id: u32, @@ -363,7 +337,6 @@ pub struct TaskId { #[prost(uint32, tag = "3")] pub partition_id: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartitionStats { #[prost(int64, tag = "1")] @@ -375,7 +348,6 @@ pub struct PartitionStats { #[prost(message, repeated, tag = "4")] pub column_stats: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnStats { #[prost(message, optional, tag = "1")] @@ -387,13 +359,11 @@ pub struct ColumnStats { #[prost(uint32, tag = "4")] pub distinct_count: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OperatorMetricsSet { #[prost(message, repeated, tag = "1")] pub metrics: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NamedCount { #[prost(string, tag = "1")] @@ -401,7 +371,6 @@ pub struct NamedCount { #[prost(uint64, tag = "2")] pub value: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NamedGauge { #[prost(string, tag = "1")] @@ -409,7 +378,6 @@ pub struct NamedGauge { #[prost(uint64, tag = "2")] pub value: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NamedTime { #[prost(string, tag = "1")] @@ -417,7 +385,6 @@ pub struct NamedTime { #[prost(uint64, tag = "2")] pub value: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OperatorMetric { #[prost( @@ -428,7 +395,6 @@ pub struct OperatorMetric { } /// Nested message and enum types in `OperatorMetric`. pub mod operator_metric { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Metric { #[prost(uint64, tag = "1")] @@ -456,7 +422,6 @@ pub mod operator_metric { } } /// Used by scheduler -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorMetadata { #[prost(string, tag = "1")] @@ -471,7 +436,6 @@ pub struct ExecutorMetadata { pub specification: ::core::option::Option, } /// Used by grpc -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorRegistration { #[prost(string, tag = "1")] @@ -491,14 +455,12 @@ pub struct ExecutorRegistration { pub mod executor_registration { /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see and ) /// this syntax is ugly but is binary compatible with the "optional" keyword (see ) - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum OptionalHost { #[prost(string, tag = "2")] Host(::prost::alloc::string::String), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorHeartbeat { #[prost(string, tag = "1")] @@ -511,8 +473,7 @@ pub struct ExecutorHeartbeat { #[prost(message, optional, tag = "4")] pub status: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutorMetric { /// TODO add more metrics #[prost(oneof = "executor_metric::Metric", tags = "1")] @@ -521,14 +482,12 @@ pub struct ExecutorMetric { /// Nested message and enum types in `ExecutorMetric`. pub mod executor_metric { /// TODO add more metrics - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Metric { #[prost(uint64, tag = "1")] AvailableMemory(u64), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorStatus { #[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")] @@ -536,7 +495,6 @@ pub struct ExecutorStatus { } /// Nested message and enum types in `ExecutorStatus`. pub mod executor_status { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Status { #[prost(string, tag = "1")] @@ -549,14 +507,12 @@ pub mod executor_status { Terminating(::prost::alloc::string::String), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorSpecification { #[prost(message, repeated, tag = "1")] pub resources: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutorResource { /// TODO add more resources #[prost(oneof = "executor_resource::Resource", tags = "1")] @@ -565,14 +521,12 @@ pub struct ExecutorResource { /// Nested message and enum types in `ExecutorResource`. pub mod executor_resource { /// TODO add more resources - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Resource { #[prost(uint32, tag = "1")] TaskSlots(u32), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AvailableTaskSlots { #[prost(string, tag = "1")] @@ -580,13 +534,11 @@ pub struct AvailableTaskSlots { #[prost(uint32, tag = "2")] pub slots: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorTaskSlots { #[prost(message, repeated, tag = "1")] pub task_slots: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorData { #[prost(string, tag = "1")] @@ -594,21 +546,18 @@ pub struct ExecutorData { #[prost(message, repeated, tag = "2")] pub resources: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutorResourcePair { #[prost(message, optional, tag = "1")] pub total: ::core::option::Option, #[prost(message, optional, tag = "2")] pub available: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RunningTask { #[prost(string, tag = "1")] pub executor_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FailedTask { #[prost(string, tag = "1")] @@ -623,7 +572,6 @@ pub struct FailedTask { } /// Nested message and enum types in `FailedTask`. pub mod failed_task { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum FailedReason { #[prost(message, tag = "4")] @@ -641,7 +589,6 @@ pub mod failed_task { TaskKilled(super::TaskKilled), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SuccessfulTask { #[prost(string, tag = "1")] @@ -651,10 +598,8 @@ pub struct SuccessfulTask { #[prost(message, repeated, tag = "2")] pub partitions: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutionError {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FetchPartitionError { #[prost(string, tag = "1")] @@ -664,19 +609,14 @@ pub struct FetchPartitionError { #[prost(uint32, tag = "3")] pub map_partition_id: u32, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct IoError {} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutorLost {} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ResultLost {} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct TaskKilled {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ShuffleWritePartition { #[prost(uint64, tag = "1")] @@ -690,7 +630,6 @@ pub struct ShuffleWritePartition { #[prost(uint64, tag = "5")] pub num_bytes: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct TaskStatus { #[prost(uint32, tag = "1")] @@ -716,7 +655,6 @@ pub struct TaskStatus { } /// Nested message and enum types in `TaskStatus`. pub mod task_status { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Status { #[prost(message, tag = "9")] @@ -727,7 +665,6 @@ pub mod task_status { Successful(super::SuccessfulTask), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PollWorkParams { #[prost(message, optional, tag = "1")] @@ -738,7 +675,6 @@ pub struct PollWorkParams { #[prost(message, repeated, tag = "3")] pub task_status: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct TaskDefinition { #[prost(uint32, tag = "1")] @@ -763,7 +699,6 @@ pub struct TaskDefinition { pub props: ::prost::alloc::vec::Vec, } /// A set of tasks in the same stage -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct MultiTaskDefinition { #[prost(message, repeated, tag = "1")] @@ -783,13 +718,11 @@ pub struct MultiTaskDefinition { #[prost(message, repeated, tag = "9")] pub props: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SessionSettings { #[prost(message, repeated, tag = "1")] pub configs: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct JobSessionConfig { #[prost(string, tag = "1")] @@ -797,25 +730,21 @@ pub struct JobSessionConfig { #[prost(message, repeated, tag = "2")] pub configs: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PollWorkResult { #[prost(message, repeated, tag = "1")] pub tasks: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterExecutorParams { #[prost(message, optional, tag = "1")] pub metadata: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct RegisterExecutorResult { #[prost(bool, tag = "1")] pub success: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct HeartBeatParams { #[prost(string, tag = "1")] @@ -827,14 +756,12 @@ pub struct HeartBeatParams { #[prost(message, optional, tag = "4")] pub metadata: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct HeartBeatResult { /// TODO it's from Spark for BlockManager #[prost(bool, tag = "1")] pub reregister: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct StopExecutorParams { #[prost(string, tag = "1")] @@ -846,10 +773,8 @@ pub struct StopExecutorParams { #[prost(bool, tag = "3")] pub force: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct StopExecutorResult {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorStoppedParams { #[prost(string, tag = "1")] @@ -858,10 +783,8 @@ pub struct ExecutorStoppedParams { #[prost(string, tag = "2")] pub reason: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutorStoppedResult {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct UpdateTaskStatusParams { #[prost(string, tag = "1")] @@ -870,13 +793,11 @@ pub struct UpdateTaskStatusParams { #[prost(message, repeated, tag = "2")] pub task_status: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct UpdateTaskStatusResult { #[prost(bool, tag = "1")] pub success: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecuteQueryParams { #[prost(message, repeated, tag = "4")] @@ -890,7 +811,6 @@ pub struct ExecuteQueryParams { } /// Nested message and enum types in `ExecuteQueryParams`. pub mod execute_query_params { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Query { #[prost(bytes, tag = "1")] @@ -898,26 +818,22 @@ pub mod execute_query_params { #[prost(string, tag = "2")] Sql(::prost::alloc::string::String), } - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum OptionalSessionId { #[prost(string, tag = "3")] SessionId(::prost::alloc::string::String), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateSessionParams { #[prost(message, repeated, tag = "1")] pub settings: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateSessionResult { #[prost(string, tag = "1")] pub session_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct UpdateSessionParams { #[prost(string, tag = "1")] @@ -925,31 +841,26 @@ pub struct UpdateSessionParams { #[prost(message, repeated, tag = "2")] pub settings: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct UpdateSessionResult { #[prost(bool, tag = "1")] pub success: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RemoveSessionParams { #[prost(string, tag = "1")] pub session_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct RemoveSessionResult { #[prost(bool, tag = "1")] pub success: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecuteSqlParams { #[prost(string, tag = "1")] pub sql: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecuteQueryResult { #[prost(oneof = "execute_query_result::Result", tags = "1, 2")] @@ -957,7 +868,6 @@ pub struct ExecuteQueryResult { } /// Nested message and enum types in `ExecuteQueryResult`. pub mod execute_query_result { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -966,7 +876,6 @@ pub mod execute_query_result { Failure(super::ExecuteQueryFailureResult), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecuteQuerySuccessResult { #[prost(string, tag = "1")] @@ -974,7 +883,6 @@ pub struct ExecuteQuerySuccessResult { #[prost(string, tag = "2")] pub session_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecuteQueryFailureResult { #[prost(oneof = "execute_query_failure_result::Failure", tags = "1, 2, 3")] @@ -982,7 +890,6 @@ pub struct ExecuteQueryFailureResult { } /// Nested message and enum types in `ExecuteQueryFailureResult`. pub mod execute_query_failure_result { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Failure { #[prost(string, tag = "1")] @@ -993,13 +900,11 @@ pub mod execute_query_failure_result { SqlParsingFailure(::prost::alloc::string::String), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetJobStatusParams { #[prost(string, tag = "1")] pub job_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SuccessfulJob { #[prost(message, repeated, tag = "1")] @@ -1011,14 +916,12 @@ pub struct SuccessfulJob { #[prost(uint64, tag = "4")] pub ended_at: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct QueuedJob { #[prost(uint64, tag = "1")] pub queued_at: u64, } /// TODO: add progress report -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RunningJob { #[prost(uint64, tag = "1")] @@ -1028,7 +931,6 @@ pub struct RunningJob { #[prost(string, tag = "3")] pub scheduler: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FailedJob { #[prost(string, tag = "1")] @@ -1040,7 +942,6 @@ pub struct FailedJob { #[prost(uint64, tag = "4")] pub ended_at: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct JobStatus { #[prost(string, tag = "5")] @@ -1052,7 +953,6 @@ pub struct JobStatus { } /// Nested message and enum types in `JobStatus`. pub mod job_status { - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Status { #[prost(message, tag = "1")] @@ -1065,13 +965,11 @@ pub mod job_status { Successful(super::SuccessfulJob), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetJobStatusResult { #[prost(message, optional, tag = "1")] pub status: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetFileMetadataParams { #[prost(string, tag = "1")] @@ -1079,40 +977,33 @@ pub struct GetFileMetadataParams { #[prost(string, tag = "2")] pub file_type: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetFileMetadataResult { #[prost(message, optional, tag = "1")] pub schema: ::core::option::Option<::datafusion_proto_common::Schema>, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FilePartitionMetadata { #[prost(string, repeated, tag = "1")] pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CancelJobParams { #[prost(string, tag = "1")] pub job_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CancelJobResult { #[prost(bool, tag = "1")] pub cancelled: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CleanJobDataParams { #[prost(string, tag = "1")] pub job_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CleanJobDataResult {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct LaunchTaskParams { /// Allow to launch a task set to an executor at once @@ -1121,7 +1012,6 @@ pub struct LaunchTaskParams { #[prost(string, tag = "2")] pub scheduler_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct LaunchMultiTaskParams { /// Allow to launch a task set to an executor at once @@ -1130,42 +1020,35 @@ pub struct LaunchMultiTaskParams { #[prost(string, tag = "2")] pub scheduler_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct LaunchTaskResult { /// TODO when part of the task set are scheduled successfully #[prost(bool, tag = "1")] pub success: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct LaunchMultiTaskResult { /// TODO when part of the task set are scheduled successfully #[prost(bool, tag = "1")] pub success: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CancelTasksParams { #[prost(message, repeated, tag = "1")] pub task_infos: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CancelTasksResult { #[prost(bool, tag = "1")] pub cancelled: bool, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RemoveJobDataParams { #[prost(string, tag = "1")] pub job_id: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct RemoveJobDataResult {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RunningTaskInfo { #[prost(uint32, tag = "1")] @@ -1179,7 +1062,13 @@ pub struct RunningTaskInfo { } /// Generated client implementations. pub mod scheduler_grpc_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; use tonic::codegen::http::Uri; #[derive(Debug, Clone)] @@ -1201,8 +1090,8 @@ pub mod scheduler_grpc_client { where T: tonic::client::GrpcService, T::Error: Into, - T::ResponseBody: Body + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -1227,7 +1116,7 @@ pub mod scheduler_grpc_client { >, , - >>::Error: Into + Send + Sync, + >>::Error: Into + std::marker::Send + std::marker::Sync, { SchedulerGrpcClient::new(InterceptedService::new(inner, interceptor)) } @@ -1271,8 +1160,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1296,8 +1184,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1328,8 +1215,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1358,8 +1244,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1388,8 +1273,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1415,8 +1299,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1442,8 +1325,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1469,8 +1351,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1496,8 +1377,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1523,8 +1403,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1551,8 +1430,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1578,8 +1456,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1603,8 +1480,7 @@ pub mod scheduler_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1623,7 +1499,13 @@ pub mod scheduler_grpc_client { } /// Generated client implementations. pub mod executor_grpc_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; use tonic::codegen::http::Uri; #[derive(Debug, Clone)] @@ -1645,8 +1527,8 @@ pub mod executor_grpc_client { where T: tonic::client::GrpcService, T::Error: Into, - T::ResponseBody: Body + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -1671,7 +1553,7 @@ pub mod executor_grpc_client { >, , - >>::Error: Into + Send + Sync, + >>::Error: Into + std::marker::Send + std::marker::Sync, { ExecutorGrpcClient::new(InterceptedService::new(inner, interceptor)) } @@ -1717,8 +1599,7 @@ pub mod executor_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1742,8 +1623,7 @@ pub mod executor_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1769,8 +1649,7 @@ pub mod executor_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1796,8 +1675,7 @@ pub mod executor_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1823,8 +1701,7 @@ pub mod executor_grpc_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1843,11 +1720,17 @@ pub mod executor_grpc_client { } /// Generated server implementations. pub mod scheduler_grpc_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with SchedulerGrpcServer. #[async_trait] - pub trait SchedulerGrpc: Send + Sync + 'static { + pub trait SchedulerGrpc: std::marker::Send + std::marker::Sync + 'static { /// Executors must poll the scheduler for heartbeat and to receive tasks async fn poll_work( &self, @@ -1936,20 +1819,18 @@ pub mod scheduler_grpc_server { >; } #[derive(Debug)] - pub struct SchedulerGrpcServer { - inner: _Inner, + pub struct SchedulerGrpcServer { + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); - impl SchedulerGrpcServer { + impl SchedulerGrpcServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -1999,8 +1880,8 @@ pub mod scheduler_grpc_server { impl tonic::codegen::Service> for SchedulerGrpcServer where T: SchedulerGrpc, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -2012,7 +1893,6 @@ pub mod scheduler_grpc_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/ballista.protobuf.SchedulerGrpc/PollWork" => { #[allow(non_camel_case_types)] @@ -2043,7 +1923,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PollWorkSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2090,7 +1969,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = RegisterExecutorSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2140,7 +2018,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = HeartBeatFromExecutorSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2187,7 +2064,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = UpdateTaskStatusSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2234,7 +2110,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetFileMetadataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2280,7 +2155,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CreateSessionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2326,7 +2200,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = UpdateSessionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2372,7 +2245,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = RemoveSessionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2418,7 +2290,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExecuteQuerySvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2464,7 +2335,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetJobStatusSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2511,7 +2381,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExecutorStoppedSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2557,7 +2426,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CancelJobSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2603,7 +2471,6 @@ pub mod scheduler_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CleanJobDataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2622,20 +2489,25 @@ pub mod scheduler_grpc_server { } _ => { Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) }) } } } } - impl Clone for SchedulerGrpcServer { + impl Clone for SchedulerGrpcServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -2647,27 +2519,25 @@ pub mod scheduler_grpc_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService for SchedulerGrpcServer { - const NAME: &'static str = "ballista.protobuf.SchedulerGrpc"; + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "ballista.protobuf.SchedulerGrpc"; + impl tonic::server::NamedService for SchedulerGrpcServer { + const NAME: &'static str = SERVICE_NAME; } } /// Generated server implementations. pub mod executor_grpc_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with ExecutorGrpcServer. #[async_trait] - pub trait ExecutorGrpc: Send + Sync + 'static { + pub trait ExecutorGrpc: std::marker::Send + std::marker::Sync + 'static { async fn launch_task( &self, request: tonic::Request, @@ -2705,20 +2575,18 @@ pub mod executor_grpc_server { >; } #[derive(Debug)] - pub struct ExecutorGrpcServer { - inner: _Inner, + pub struct ExecutorGrpcServer { + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); - impl ExecutorGrpcServer { + impl ExecutorGrpcServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -2768,8 +2636,8 @@ pub mod executor_grpc_server { impl tonic::codegen::Service> for ExecutorGrpcServer where T: ExecutorGrpc, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -2781,7 +2649,6 @@ pub mod executor_grpc_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/ballista.protobuf.ExecutorGrpc/LaunchTask" => { #[allow(non_camel_case_types)] @@ -2812,7 +2679,6 @@ pub mod executor_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = LaunchTaskSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2859,7 +2725,6 @@ pub mod executor_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = LaunchMultiTaskSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2905,7 +2770,6 @@ pub mod executor_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = StopExecutorSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2951,7 +2815,6 @@ pub mod executor_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CancelTasksSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -2997,7 +2860,6 @@ pub mod executor_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = RemoveJobDataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3016,20 +2878,25 @@ pub mod executor_grpc_server { } _ => { Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) }) } } } } - impl Clone for ExecutorGrpcServer { + impl Clone for ExecutorGrpcServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -3041,17 +2908,9 @@ pub mod executor_grpc_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService for ExecutorGrpcServer { - const NAME: &'static str = "ballista.protobuf.ExecutorGrpc"; + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "ballista.protobuf.ExecutorGrpc"; + impl tonic::server::NamedService for ExecutorGrpcServer { + const NAME: &'static str = SERVICE_NAME; } } diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs index 2bb555d1a..fce4a399e 100644 --- a/ballista/core/src/serde/mod.rs +++ b/ballista/core/src/serde/mod.rs @@ -309,7 +309,7 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { Some(datafusion_proto::protobuf::PhysicalHashRepartition { hash_expr: exprs .iter() - .map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(), &default_codec)) + .map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(&expr.clone(), &default_codec)) .collect::, DataFusionError>>()?, partition_count: *partition_count as u64, }) diff --git a/ballista/core/src/serde/scheduler/to_proto.rs b/ballista/core/src/serde/scheduler/to_proto.rs index f6a878fa9..29b00dd78 100644 --- a/ballista/core/src/serde/scheduler/to_proto.rs +++ b/ballista/core/src/serde/scheduler/to_proto.rs @@ -106,7 +106,7 @@ pub fn hash_partitioning_to_proto( Ok(Some(datafusion_protobuf::PhysicalHashRepartition { hash_expr: exprs .iter() - .map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(), &default_codec)) + .map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(&expr.clone(), &default_codec)) .collect::, DataFusionError>>()?, partition_count: *partition_count as u64, })) diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs index 43387f094..a96a752c2 100644 --- a/ballista/executor/src/flight_service.rs +++ b/ballista/executor/src/flight_service.rs @@ -38,7 +38,7 @@ use arrow_flight::{ use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch}; use futures::{Stream, StreamExt, TryStreamExt}; use log::{debug, info}; -use std::io::{Read, Seek}; +use std::io::{BufReader, Read, Seek}; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::error::SendError; use tokio::{sync::mpsc::Sender, task}; @@ -95,6 +95,7 @@ impl FlightService for BallistaFlightService { )) }) .map_err(|e| from_ballista_err(&e))?; + let file = BufReader::new(file); let reader = StreamReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?; diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index e6c4b3ad9..1367f17b2 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -43,7 +43,7 @@ prometheus-metrics = ["prometheus", "once_cell"] anyhow = "1" arrow-flight = { workspace = true } async-trait = { workspace = true } -axum = "0.6.20" +axum = "0.7.7" ballista-core = { path = "../core", version = "0.12.0", features = ["s3"] } base64 = { version = "0.22" } clap = { workspace = true } @@ -53,7 +53,7 @@ datafusion = { workspace = true } datafusion-proto = { workspace = true } futures = { workspace = true } graphviz-rust = "0.9.0" -http = "0.2.9" +http = "1.1" log = { workspace = true } object_store = { workspace = true } once_cell = { version = "1.16.0", optional = true } diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs index d0c8c270f..5a3e00cc1 100644 --- a/ballista/scheduler/build.rs +++ b/ballista/scheduler/build.rs @@ -24,6 +24,6 @@ fn main() -> Result<(), String> { println!("cargo:rerun-if-changed=proto/keda.proto"); tonic_build::configure() - .compile(&["proto/keda.proto"], &["proto"]) + .compile_protos(&["proto/keda.proto"], &["proto"]) .map_err(|e| format!("protobuf compilation failed: {e}")) } diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs index c6297d78c..2187db064 100644 --- a/ballista/scheduler/src/flight_sql.rs +++ b/ballista/scheduler/src/flight_sql.rs @@ -64,7 +64,9 @@ use datafusion::arrow; use datafusion::arrow::array::{ArrayRef, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::error::ArrowError; -use datafusion::arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions}; +use datafusion::arrow::ipc::writer::{ + DictionaryTracker, IpcDataGenerator, IpcWriteOptions, +}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::DFSchemaRef; use datafusion::logical_expr::LogicalPlan; @@ -368,7 +370,15 @@ impl FlightSqlServiceImpl { let options = IpcWriteOptions::default(); let pair = SchemaAsIpc::new(&arrow_schema, &options); let data_gen = IpcDataGenerator::default(); - let encoded_data = data_gen.schema_to_bytes(pair.0, pair.1); + let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id( + false, + pair.1.preserve_dict_id(), + ); + let encoded_data = data_gen.schema_to_bytes_with_dictionary_tracker( + pair.0, + &mut dictionary_tracker, + pair.1, + ); let mut schema_bytes = vec![]; arrow::ipc::writer::write_message(&mut schema_bytes, encoded_data, pair.1) .map_err(|e| Status::internal(format!("Error encoding schema: {e}")))?; diff --git a/ballista/scheduler/src/scheduler_process.rs b/ballista/scheduler/src/scheduler_process.rs index 140dc1412..5d1671a02 100644 --- a/ballista/scheduler/src/scheduler_process.rs +++ b/ballista/scheduler/src/scheduler_process.rs @@ -80,15 +80,15 @@ pub async fn start_server( FlightSqlServiceImpl::new(scheduler_server.clone()), )); - let tonic = tonic_builder.into_service().into_router(); + let tonic = tonic_builder.into_service().into_axum_router(); let axum = get_routes(Arc::new(scheduler_server)); let merged = axum .merge(tonic) .into_make_service_with_connect_info::(); - axum::Server::bind(&addr) - .serve(merged) + let listener = tokio::net::TcpListener::bind(&addr) .await - .map_err(Error::from) + .map_err(Error::from)?; + axum::serve(listener, merged).await.map_err(Error::from) } diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index 4cf9d83fe..8880dfd11 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -66,7 +66,7 @@ pub fn create_datafusion_context( session_builder: SessionBuilder, ) -> Arc { let config = - SessionConfig::from_string_hash_map(ballista_config.settings().clone()).unwrap(); + SessionConfig::from_string_hash_map(&ballista_config.settings().clone()).unwrap(); let config = config .with_target_partitions(ballista_config.default_shuffle_partitions()) .with_batch_size(ballista_config.default_batch_size()) diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 5e5dee124..27bc0ec8b 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -950,7 +950,7 @@ pub async fn test_join_plan(partition: usize) -> ExecutionGraph { .build() .unwrap(); - let sort_expr = Expr::Sort(SortExpr::new(Box::new(col("id")), false, false)); + let sort_expr = SortExpr::new(col("id"), false, false); let logical_plan = left_plan .join(right_plan, JoinType::Inner, (vec!["id"], vec!["id"]), None) diff --git a/python/Cargo.toml b/python/Cargo.toml index eb662cb1e..758d162f3 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -33,11 +33,11 @@ publish = false async-trait = "0.1.77" ballista = { path = "../ballista/client", version = "0.12.0" } ballista-core = { path = "../ballista/core", version = "0.12.0" } -datafusion = "41.0.0" -datafusion-proto = "41.0.0" -datafusion-python = "41.0.0" +datafusion = { workspace = true } +datafusion-proto = { workspace = true } +datafusion-python = { workspace = true } -pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38"] } +pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] } pyo3-log = "0.11.0" tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] }