diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be344eb58..4114bf31e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,10 +32,9 @@ jobs: distribution: 'temurin' java-version: '11' - name: Install Rust - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@1.83 with: - toolchain: stable - override: true + components: clippy, rustfmt - name: Check Formatting run: cargo fmt -- --check - uses: actions/setup-python@v5 diff --git a/crates/arroyo-connectors/src/filesystem/sink/mod.rs b/crates/arroyo-connectors/src/filesystem/sink/mod.rs index 1d5c5bf4d..9c066c56e 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/mod.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/mod.rs @@ -1238,7 +1238,7 @@ impl MultipartManager { } }; if self.all_uploads_finished() { - return FileCheckpointData::MultiPartWriterUploadCompleted { + FileCheckpointData::MultiPartWriterUploadCompleted { multi_part_upload_id: multipart_id.clone(), completed_parts: self .pushed_parts @@ -1250,7 +1250,7 @@ impl MultipartManager { } }) .collect(), - }; + } } else { let in_flight_parts = self .pushed_parts diff --git a/crates/arroyo-connectors/src/nexmark/operator.rs b/crates/arroyo-connectors/src/nexmark/operator.rs index 8d942cb2c..ecc7ab127 100644 --- a/crates/arroyo-connectors/src/nexmark/operator.rs +++ b/crates/arroyo-connectors/src/nexmark/operator.rs @@ -457,7 +457,7 @@ impl GeneratorConfig { (1_000_000_000.0 / (nexmark_config.first_event_rate) * (nexmark_config.num_event_generators as f64)) as u64, ), - _step_length_second: (nexmark_config.rate_period_seconds + 2 - 1) / 2, + _step_length_second: nexmark_config.rate_period_seconds.div_ceil(2), base_time, first_event_id, max_events: nexmark_config.get_max_events(max_events), diff --git a/crates/arroyo-controller/src/states/mod.rs b/crates/arroyo-controller/src/states/mod.rs index c1986650a..865db5b6a 100644 --- a/crates/arroyo-controller/src/states/mod.rs +++ b/crates/arroyo-controller/src/states/mod.rs @@ -365,7 +365,7 @@ pub struct JobContext<'a> { metrics: Arc, JobMetrics>>>, } -impl<'a> JobContext<'a> { +impl JobContext<'_> { pub fn handle(&mut self, msg: JobMessage) -> Result<(), StateError> { if !matches!( msg, diff --git a/crates/arroyo-planner/src/builder.rs b/crates/arroyo-planner/src/builder.rs index 7b97b2a54..89251bb0d 100644 --- a/crates/arroyo-planner/src/builder.rs +++ b/crates/arroyo-planner/src/builder.rs @@ -263,7 +263,7 @@ impl ExtensionPlanner for ArroyoExtensionPlanner { } } -impl<'a> PlanToGraphVisitor<'a> { +impl PlanToGraphVisitor<'_> { fn add_index_to_traversal(&mut self, index: NodeIndex) { if let Some(last) = self.traversal.last_mut() { last.push(index); @@ -327,7 +327,7 @@ impl<'a> PlanToGraphVisitor<'a> { } } -impl<'a> TreeNodeVisitor<'_> for PlanToGraphVisitor<'a> { +impl TreeNodeVisitor<'_> for PlanToGraphVisitor<'_> { type Node = LogicalPlan; fn f_down(&mut self, node: &Self::Node) -> Result { diff --git a/crates/arroyo-planner/src/plan/aggregate.rs b/crates/arroyo-planner/src/plan/aggregate.rs index eae10375f..06abb0277 100644 --- a/crates/arroyo-planner/src/plan/aggregate.rs +++ b/crates/arroyo-planner/src/plan/aggregate.rs @@ -20,7 +20,7 @@ pub struct AggregateRewriter<'a> { pub schema_provider: &'a ArroyoSchemaProvider, } -impl<'a> AggregateRewriter<'a> { +impl AggregateRewriter<'_> { pub fn rewrite_non_windowed_aggregate( input: Arc, mut key_fields: Vec, @@ -119,7 +119,7 @@ impl<'a> AggregateRewriter<'a> { } } -impl<'a> TreeNodeRewriter for AggregateRewriter<'a> { +impl TreeNodeRewriter for AggregateRewriter<'_> { type Node = LogicalPlan; fn f_up(&mut self, node: Self::Node) -> Result> { diff --git a/crates/arroyo-planner/src/plan/join.rs b/crates/arroyo-planner/src/plan/join.rs index 2af85bb6a..9dea42c56 100644 --- a/crates/arroyo-planner/src/plan/join.rs +++ b/crates/arroyo-planner/src/plan/join.rs @@ -21,7 +21,7 @@ pub(crate) struct JoinRewriter<'a> { pub schema_provider: &'a ArroyoSchemaProvider, } -impl<'a> JoinRewriter<'a> { +impl JoinRewriter<'_> { fn check_join_windowing(join: &Join) -> Result { let left_window = WindowDetectingVisitor::get_window(&join.left)?; let right_window = WindowDetectingVisitor::get_window(&join.right)?; @@ -189,7 +189,7 @@ impl<'a> JoinRewriter<'a> { } } -impl<'a> TreeNodeRewriter for JoinRewriter<'a> { +impl TreeNodeRewriter for JoinRewriter<'_> { type Node = LogicalPlan; fn f_up(&mut self, node: Self::Node) -> Result> { diff --git a/crates/arroyo-planner/src/plan/mod.rs b/crates/arroyo-planner/src/plan/mod.rs index 2ac52999c..828c1e702 100644 --- a/crates/arroyo-planner/src/plan/mod.rs +++ b/crates/arroyo-planner/src/plan/mod.rs @@ -258,7 +258,7 @@ pub struct ArroyoRewriter<'a> { pub(crate) schema_provider: &'a ArroyoSchemaProvider, } -impl<'a> TreeNodeRewriter for ArroyoRewriter<'a> { +impl TreeNodeRewriter for ArroyoRewriter<'_> { type Node = LogicalPlan; fn f_up(&mut self, mut node: Self::Node) -> Result> { diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index 64da2fdef..3dfbd8131 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -40,7 +40,7 @@ pub struct SourceRewriter<'a> { pub(crate) schema_provider: &'a ArroyoSchemaProvider, } -impl<'a> SourceRewriter<'a> { +impl SourceRewriter<'_> { fn watermark_expression(table: &ConnectorTable) -> DFResult { let expr = match table.watermark_field.clone() { Some(watermark_field) => table @@ -253,7 +253,7 @@ impl<'a> SourceRewriter<'a> { } } -impl<'a> TreeNodeRewriter for SourceRewriter<'a> { +impl TreeNodeRewriter for SourceRewriter<'_> { type Node = LogicalPlan; fn f_up(&mut self, node: Self::Node) -> DFResult> { @@ -502,7 +502,7 @@ impl<'a> AsyncUdfRewriter<'a> { } } -impl<'a> TreeNodeRewriter for AsyncUdfRewriter<'a> { +impl TreeNodeRewriter for AsyncUdfRewriter<'_> { type Node = LogicalPlan; fn f_up(&mut self, node: Self::Node) -> DFResult> { @@ -588,7 +588,7 @@ impl<'a> SourceMetadataVisitor<'a> { } } -impl<'a> SourceMetadataVisitor<'a> { +impl SourceMetadataVisitor<'_> { fn get_connection_id(&self, node: &LogicalPlan) -> Option { let LogicalPlan::Extension(Extension { node }) = node else { return None; @@ -614,7 +614,7 @@ impl<'a> SourceMetadataVisitor<'a> { } } -impl<'a> TreeNodeVisitor<'_> for SourceMetadataVisitor<'a> { +impl TreeNodeVisitor<'_> for SourceMetadataVisitor<'_> { type Node = LogicalPlan; fn f_down(&mut self, node: &Self::Node) -> DFResult { diff --git a/crates/arroyo-rpc/src/var_str.rs b/crates/arroyo-rpc/src/var_str.rs index e2f2b622e..b3687a4fd 100644 --- a/crates/arroyo-rpc/src/var_str.rs +++ b/crates/arroyo-rpc/src/var_str.rs @@ -21,7 +21,7 @@ impl Serialize for VarStr { struct VarStrVisitor; -impl<'de> Visitor<'de> for VarStrVisitor { +impl Visitor<'_> for VarStrVisitor { type Value = VarStr; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { diff --git a/crates/arroyo-state/src/tables/global_keyed_map.rs b/crates/arroyo-state/src/tables/global_keyed_map.rs index d6c41f996..275fd841a 100644 --- a/crates/arroyo-state/src/tables/global_keyed_map.rs +++ b/crates/arroyo-state/src/tables/global_keyed_map.rs @@ -47,6 +47,7 @@ pub struct GlobalKeyedTable { } impl GlobalKeyedTable { + #[allow(clippy::type_complexity)] fn get_key_value_iterator<'a>( &self, record_batch: &'a RecordBatch, diff --git a/crates/arroyo-types/src/lib.rs b/crates/arroyo-types/src/lib.rs index 3a0a4d98b..95b37bb36 100644 --- a/crates/arroyo-types/src/lib.rs +++ b/crates/arroyo-types/src/lib.rs @@ -458,7 +458,7 @@ pub struct CheckpointBarrier { pub struct DisplayAsSql<'a>(pub &'a DataType); -impl<'a> Display for DisplayAsSql<'a> { +impl Display for DisplayAsSql<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.0 { DataType::Boolean => write!(f, "BOOLEAN"),