From 9412b0415893717d1f8a9504ff3ceb37b45ac20b Mon Sep 17 00:00:00 2001 From: Matt Forbes Date: Thu, 21 Nov 2024 14:13:22 -0800 Subject: [PATCH] lint --- crates/arroyo-connectors/src/kafka/sink/mod.rs | 3 ++- crates/arroyo-connectors/src/kafka/source/mod.rs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/arroyo-connectors/src/kafka/sink/mod.rs b/crates/arroyo-connectors/src/kafka/sink/mod.rs index 696f96535..4b90d4080 100644 --- a/crates/arroyo-connectors/src/kafka/sink/mod.rs +++ b/crates/arroyo-connectors/src/kafka/sink/mod.rs @@ -152,7 +152,8 @@ impl KafkaSinkFunc { next_transaction_index ); client_config.set("transactional.id", transactional_id); - let producer: FutureProducer = client_config.create_with_context(self.context.clone())?; + let producer: FutureProducer = + client_config.create_with_context(self.context.clone())?; producer.init_transactions(Timeout::After(Duration::from_secs(30)))?; producer.begin_transaction()?; *next_transaction_index += 1; diff --git a/crates/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs index 66dfbad8c..299bf40b6 100644 --- a/crates/arroyo-connectors/src/kafka/source/mod.rs +++ b/crates/arroyo-connectors/src/kafka/source/mod.rs @@ -1,7 +1,7 @@ use anyhow::bail; -use futures::FutureExt; use async_trait::async_trait; use bincode::{Decode, Encode}; +use futures::FutureExt; use governor::{Quota, RateLimiter as GovernorRateLimiter}; use rdkafka::consumer::{CommitMode, Consumer}; use rdkafka::{ClientConfig, Message as KMessage, Offset, TopicPartitionList}; @@ -14,13 +14,13 @@ use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, warn}; use arroyo_formats::de::FieldValueType; +use arroyo_operator::context::ArrowContext; +use arroyo_operator::operator::SourceOperator; +use arroyo_operator::SourceFinishType; use arroyo_rpc::formats::{BadData, Format, Framing}; use arroyo_rpc::grpc::rpc::TableConfig; use arroyo_rpc::schema_resolver::SchemaResolver; use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField}; -use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::SourceOperator; -use arroyo_operator::SourceFinishType; use arroyo_types::*; use super::{Context, SourceOffset, StreamConsumer};