Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
emef committed Nov 21, 2024
1 parent c0ad9b0 commit 9412b04
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
3 changes: 2 additions & 1 deletion crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ impl KafkaSinkFunc {
next_transaction_index
);
client_config.set("transactional.id", transactional_id);
let producer: FutureProducer = client_config.create_with_context(self.context.clone())?;
let producer: FutureProducer =
client_config.create_with_context(self.context.clone())?;
producer.init_transactions(Timeout::After(Duration::from_secs(30)))?;
producer.begin_transaction()?;
*next_transaction_index += 1;
Expand Down
8 changes: 4 additions & 4 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::bail;
use futures::FutureExt;
use async_trait::async_trait;
use bincode::{Decode, Encode};
use futures::FutureExt;
use governor::{Quota, RateLimiter as GovernorRateLimiter};
use rdkafka::consumer::{CommitMode, Consumer};
use rdkafka::{ClientConfig, Message as KMessage, Offset, TopicPartitionList};
Expand All @@ -14,13 +14,13 @@ use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, warn};

use arroyo_formats::de::FieldValueType;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
use arroyo_operator::SourceFinishType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
use arroyo_operator::SourceFinishType;
use arroyo_types::*;

use super::{Context, SourceOffset, StreamConsumer};
Expand Down

0 comments on commit 9412b04

Please sign in to comment.