Skip to content

Commit

Permalink
re-introduce custom token cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 8, 2025
1 parent 15f93ad commit 60d590f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-dat

datafusion-functions-json = {git = 'https://github.com/ArroyoSystems/datafusion-functions-json', branch = 'datafusion_43'}

object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' }
# object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' }
object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'public_token_cache' }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
Expand Down
52 changes: 34 additions & 18 deletions crates/arroyo-storage/src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::StorageError;
use aws_config::identity::IdentityCache;
use aws_config::timeout::TimeoutConfig;
use aws_config::{BehaviorVersion, SdkConfig};
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use object_store::{aws::AwsCredential, CredentialProvider};
use object_store::{aws::AwsCredential, CredentialProvider, TemporaryToken, TokenCache};
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::OnceCell;

pub struct ArroyoCredentialProvider {
cache: TokenCache<Arc<AwsCredential>>,
provider: SharedCredentialsProvider,
}

Expand All @@ -31,11 +32,6 @@ async fn get_config<'a>() -> &'a SdkConfig {
.operation_attempt_timeout(Duration::from_secs(5))
.build(),
)
.identity_cache(
IdentityCache::lazy()
.buffer_time(Duration::from_secs(60 * 5))
.build(),
)
.load()
.await,
)
Expand All @@ -57,6 +53,7 @@ impl ArroyoCredentialProvider {
.clone();

Ok(Self {
cache: Default::default(),
provider: credentials,
})
}
Expand All @@ -66,21 +63,40 @@ impl ArroyoCredentialProvider {
}
}

async fn get_token(
provider: &SharedCredentialsProvider,
) -> Result<TemporaryToken<Arc<AwsCredential>>, Box<dyn Error + Send + Sync>> {
let creds = provider
.provide_credentials()
.await
.map_err(|e| object_store::Error::Generic {
store: "S3",
source: Box::new(e),
})?;
let expiry = creds
.expiry()
.map(|exp| Instant::now() + exp.elapsed().unwrap_or_default());
Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: creds.access_key_id().to_string(),
secret_key: creds.secret_access_key().to_string(),
token: creds.session_token().map(ToString::to_string),
}),
expiry,
})
}

#[async_trait::async_trait]
impl CredentialProvider for ArroyoCredentialProvider {
type Credential = AwsCredential;

async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
let creds = self.provider.provide_credentials().await.map_err(|e| {
object_store::Error::Generic {
self.cache
.get_or_insert_with(|| get_token(&self.provider))
.await
.map_err(|e| object_store::Error::Generic {
store: "S3",
source: Box::new(e),
}
})?;
Ok(Arc::new(AwsCredential {
key_id: creds.access_key_id().to_string(),
secret_key: creds.secret_access_key().to_string(),
token: creds.session_token().map(ToString::to_string),
}))
source: e,
})
}
}

0 comments on commit 60d590f

Please sign in to comment.