Skip to content

Commit

Permalink
Rely on AWS cred cache, but increase buffer time and add timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 7, 2025
1 parent 79ac889 commit 5443b5c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-dat

datafusion-functions-json = {git = 'https://github.com/ArroyoSystems/datafusion-functions-json', branch = 'datafusion_43'}

# object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' }
object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'public_token_cache' }
object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
Expand Down
7 changes: 0 additions & 7 deletions crates/arroyo-controller/src/schedulers/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,6 @@ impl Scheduler for KubernetesScheduler {
let replicas = (req.slots as f32 / config().kubernetes_scheduler.worker.task_slots as f32)
.ceil() as usize;

info!(
job_id = *req.job_id,
message = "starting workers on k8s",
replicas,
task_slots = req.slots
);

let max_slots_per_pod = config().kubernetes_scheduler.worker.task_slots as usize;
let mut slots_scheduled = 0;
let mut pods = vec![];
Expand Down
88 changes: 44 additions & 44 deletions crates/arroyo-storage/src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::StorageError;
use aws_config::BehaviorVersion;
use aws_config::identity::IdentityCache;
use aws_config::timeout::TimeoutConfig;
use aws_config::{BehaviorVersion, SdkConfig};
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use object_store::{aws::AwsCredential, CredentialProvider, TemporaryToken, TokenCache};
use std::error::Error;
use object_store::{aws::AwsCredential, CredentialProvider};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio::sync::OnceCell;

pub struct ArroyoCredentialProvider {
cache: TokenCache<Arc<AwsCredential>>,
provider: aws_credential_types::provider::SharedCredentialsProvider,
provider: SharedCredentialsProvider,
}

impl std::fmt::Debug for ArroyoCredentialProvider {
Expand All @@ -17,9 +18,34 @@ impl std::fmt::Debug for ArroyoCredentialProvider {
}
}

static AWS_CONFIG: OnceCell<Arc<SdkConfig>> = OnceCell::const_new();

async fn get_config<'a>() -> &'a SdkConfig {
&*AWS_CONFIG
.get_or_init(|| async {
Arc::new(
aws_config::defaults(BehaviorVersion::latest())
.timeout_config(
TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(60))
.operation_attempt_timeout(Duration::from_secs(5))
.build(),
)
.identity_cache(
IdentityCache::lazy()
.buffer_time(Duration::from_secs(60 * 5))
.build(),
)
.load()
.await,
)
})
.await
}

impl ArroyoCredentialProvider {
pub async fn try_new() -> Result<Self, StorageError> {
let config = aws_config::defaults(BehaviorVersion::latest()).load().await;
let config = get_config().await;

let credentials = config
.credentials_provider()
Expand All @@ -31,56 +57,30 @@ impl ArroyoCredentialProvider {
.clone();

Ok(Self {
cache: TokenCache::default().with_min_ttl(Duration::from_secs(60)),
provider: credentials,
})
}

pub async fn default_region() -> Option<String> {
aws_config::defaults(BehaviorVersion::latest())
.load()
.await
.region()
.map(|r| r.to_string())
get_config().await.region().map(|r| r.to_string())
}
}

async fn get_token(
provider: &SharedCredentialsProvider,
) -> Result<TemporaryToken<Arc<AwsCredential>>, Box<dyn Error + Send + Sync>> {
let creds = provider
.provide_credentials()
.await
.map_err(|e| object_store::Error::Generic {
store: "S3",
source: Box::new(e),
})?;

let expiry = creds
.expiry()
.map(|exp| Instant::now() + exp.elapsed().unwrap_or_default());

Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: creds.access_key_id().to_string(),
secret_key: creds.secret_access_key().to_string(),
token: creds.session_token().map(ToString::to_string),
}),
expiry,
})
}

#[async_trait::async_trait]
impl CredentialProvider for ArroyoCredentialProvider {
type Credential = AwsCredential;

async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
self.cache
.get_or_insert_with(|| get_token(&self.provider))
.await
.map_err(|e| object_store::Error::Generic {
let creds = self.provider.provide_credentials().await.map_err(|e| {
object_store::Error::Generic {
store: "S3",
source: e,
})
source: Box::new(e),
}
})?;
Ok(Arc::new(AwsCredential {
key_id: creds.access_key_id().to_string(),
secret_key: creds.secret_access_key().to_string(),
token: creds.session_token().map(ToString::to_string),
}))
}
}

0 comments on commit 5443b5c

Please sign in to comment.