Skip to content

Commit

Permalink
trigger oauth refresh on first use
Browse files Browse the repository at this point in the history
  • Loading branch information
emef committed Nov 21, 2024
1 parent 16e22c1 commit c0ad9b0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ regex = "1"
aws-sdk-kafka = { version = "1.44" }
aws-msk-iam-sasl-signer = "1.0.0"
rdkafka = { version = "0.36", features = ["cmake-build", "tracing", "sasl", "ssl-vendored"] }
rdkafka-sys = "4.5.0"
rdkafka-sys = "4.7.0"
sasl2-sys = { version = "0.1.6", features = ["vendored"] }

# SSE
Expand Down
7 changes: 7 additions & 0 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,12 @@ impl KafkaTester {
.create_with_context(context)
.map_err(|e| format!("invalid kafka config: {:?}", e))?;

// NOTE: this is required to trigger an oauth token refresh (when using
// OAUTHBEARER auth).
if client.poll(Duration::from_secs(0)).is_some() {
return Err("unexpected poll event from new consumer".to_string());
}

tokio::task::spawn_blocking(move || {
client
.fetch_metadata(None, Duration::from_secs(10))
Expand Down Expand Up @@ -980,6 +986,7 @@ impl ClientContext for Context {
});
handle.join().unwrap()??
};

Ok(OAuthToken {
token,
principal_name: "".to_string(),
Expand Down
28 changes: 18 additions & 10 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
use arroyo_formats::de::FieldValueType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};

use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
use arroyo_operator::SourceFinishType;
use arroyo_types::*;
use anyhow::bail;
use futures::FutureExt;
use async_trait::async_trait;
use bincode::{Decode, Encode};
use governor::{Quota, RateLimiter as GovernorRateLimiter};
Expand All @@ -21,6 +13,16 @@ use tokio::select;
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, warn};

use arroyo_formats::de::FieldValueType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
use arroyo_operator::SourceFinishType;
use arroyo_types::*;

use super::{Context, SourceOffset, StreamConsumer};

#[cfg(test)]
Expand Down Expand Up @@ -84,6 +86,12 @@ impl KafkaSourceFunc {
.set("group.id", group_id)
.create_with_context(self.context.clone())?;

// NOTE: this is required to trigger an oauth token refresh (when using
// OAUTHBEARER auth).
if consumer.recv().now_or_never().is_some() {
bail!("unexpected recv before assignments");
}

let state: Vec<_> = ctx
.table_manager
.get_global_keyed_state::<i32, KafkaState>("k")
Expand Down

0 comments on commit c0ad9b0

Please sign in to comment.