Skip to content

Commit

Permalink
Use SelfSignedJwt for Service Accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 13, 2023
1 parent b79b1b6 commit 405b787
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 94 deletions.
131 changes: 49 additions & 82 deletions object_store/src/gcp/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::info;

// TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes
pub const DEFAULT_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.full_control";
pub const DEFAULT_AUDIENCE: &str = "https://www.googleapis.com/oauth2/v4/token";

pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com";

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down Expand Up @@ -92,48 +92,48 @@ pub struct GcpCredential {
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Default, serde::Serialize)]
pub struct JwtHeader {
pub struct JwtHeader<'a> {
/// The type of JWS: it can only be "JWT" here
///
/// Defined in [RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9).
#[serde(skip_serializing_if = "Option::is_none")]
pub typ: Option<String>,
pub typ: Option<&'a str>,
/// The algorithm used
///
/// Defined in [RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1).
pub alg: String,
pub alg: &'a str,
/// Content type
///
/// Defined in [RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2).
#[serde(skip_serializing_if = "Option::is_none")]
pub cty: Option<String>,
pub cty: Option<&'a str>,
/// JSON Key URL
///
/// Defined in [RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2).
#[serde(skip_serializing_if = "Option::is_none")]
pub jku: Option<String>,
pub jku: Option<&'a str>,
/// Key ID
///
/// Defined in [RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4).
#[serde(skip_serializing_if = "Option::is_none")]
pub kid: Option<String>,
pub kid: Option<&'a str>,
/// X.509 URL
///
/// Defined in [RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5).
#[serde(skip_serializing_if = "Option::is_none")]
pub x5u: Option<String>,
pub x5u: Option<&'a str>,
/// X.509 certificate thumbprint
///
/// Defined in [RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7).
#[serde(skip_serializing_if = "Option::is_none")]
pub x5t: Option<String>,
pub x5t: Option<&'a str>,
}

#[derive(serde::Serialize)]
struct TokenClaims<'a> {
iss: &'a str,
sub: &'a str,
scope: &'a str,
aud: &'a str,
exp: u64,
iat: u64,
}
Expand All @@ -144,61 +144,61 @@ struct TokenResponse {
expires_in: u64,
}

/// Encapsulates the logic to perform an OAuth token challenge
/// <https://google.aip.dev/auth/4111>
#[derive(Debug)]
pub struct OAuthProvider {
pub struct SelfSignedJwt {
issuer: String,
scope: String,
audience: String,
key_pair: RsaKeyPair,
jwt_header: String,
random: ring::rand::SystemRandom,
}

impl OAuthProvider {
/// Create a new [`OAuthProvider`]
impl SelfSignedJwt {
/// Create a new [`SelfSignedJwt`]
pub fn new(
key_id: String,
issuer: String,
private_key_pem: String,
scope: String,
audience: String,
) -> Result<Self> {
let key_pair = decode_first_rsa_key(private_key_pem)?;
let jwt_header = b64_encode_obj(&JwtHeader {
alg: "RS256".to_string(),
alg: "RS256",
typ: Some("JWT"),
kid: Some(&key_id),
..Default::default()
})?;

Ok(Self {
issuer,
key_pair,
scope,
audience,
jwt_header,
random: ring::rand::SystemRandom::new(),
})
}
}

#[async_trait]
impl TokenProvider for OAuthProvider {
impl TokenProvider for SelfSignedJwt {
type Credential = GcpCredential;

/// Fetch a fresh token
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
_client: &Client,
_retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpCredential>>> {
let now = seconds_since_epoch();
let exp = now + 3600;

let claims = TokenClaims {
iss: &self.issuer,
sub: &self.issuer,
scope: &self.scope,
aud: &self.audience,
exp,
iat: now,
exp,
};

let claim_str = b64_encode_obj(&claims)?;
Expand All @@ -214,30 +214,11 @@ impl TokenProvider for OAuthProvider {
.context(SignSnafu)?;

let signature = BASE64_URL_SAFE_NO_PAD.encode(sig_bytes);
let jwt = [message, signature].join(".");

let body = [
("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
("assertion", &jwt),
];

let response: TokenResponse = client
.request(Method::POST, &self.audience)
.form(&body)
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.json()
.await
.context(TokenResponseBodySnafu)?;

println!("{}", response.access_token);
let bearer = [message, signature].join(".");

Ok(TemporaryToken {
token: Arc::new(GcpCredential {
bearer: response.access_token,
}),
expiry: Some(Instant::now() + Duration::from_secs(response.expires_in)),
token: Arc::new(GcpCredential { bearer }),
expiry: Some(Instant::now() + Duration::from_secs(3600)),
})
}
}
Expand All @@ -261,29 +242,24 @@ pub struct ServiceAccountCredentials {
/// The private key in RSA format.
pub private_key: String,

/// The private key ID
pub private_key_id: String,

/// The email address associated with the service account.
pub client_email: String,

/// Base URL for GCS
#[serde(default = "default_gcs_base_url")]
pub gcs_base_url: String,
#[serde(default)]
pub gcs_base_url: Option<String>,

/// Disable oauth and use empty tokens.
#[serde(default = "default_disable_oauth")]
#[serde(default)]
pub disable_oauth: bool,
}

pub fn default_gcs_base_url() -> String {
"https://storage.googleapis.com".to_owned()
}

pub fn default_disable_oauth() -> bool {
false
}

impl ServiceAccountCredentials {
/// Create a new [`ServiceAccountCredentials`] from a file.
pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
read_credentials_file(path)
}

Expand All @@ -292,13 +268,19 @@ impl ServiceAccountCredentials {
serde_json::from_str(key).context(DecodeCredentialsSnafu)
}

/// Create an [`OAuthProvider`] from this credentials struct.
pub fn oauth_provider(self) -> crate::Result<OAuthProvider> {
Ok(OAuthProvider::new(
/// Create a [`SelfSignedJwt`] from this credentials struct.
///
/// We use a scope of [`DEFAULT_SCOPE`] as opposed to an audience
/// as GCS appears to not support audience
///
/// <https://stackoverflow.com/questions/63222450/service-account-authorization-without-oauth-can-we-get-file-from-google-cloud/71834557#71834557>
/// <https://www.codejam.info/2022/05/google-cloud-service-account-authorization-without-oauth.html>
pub fn token_provider(self) -> crate::Result<SelfSignedJwt> {
Ok(SelfSignedJwt::new(
self.private_key_id,
self.client_email,
self.private_key,
DEFAULT_SCOPE.to_string(),
DEFAULT_AUDIENCE.to_string(),
)?)
}
}
Expand Down Expand Up @@ -335,33 +317,21 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
///
/// <https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
#[derive(Debug, Default)]
pub struct InstanceCredentialProvider {
audience: String,
}

impl InstanceCredentialProvider {
/// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options.
pub fn new<T: Into<String>>(audience: T) -> Self {
Self {
audience: audience.into(),
}
}
}
pub struct InstanceCredentialProvider {}

/// Make a request to the metadata server to fetch a token, using a a given hostname.
async fn make_metadata_request(
client: &Client,
hostname: &str,
retry: &RetryConfig,
audience: &str,
) -> crate::Result<TokenResponse> {
let url = format!(
"http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token"
);
let response: TokenResponse = client
.request(Method::GET, url)
.header("Metadata-Flavor", "Google")
.query(&[("audience", audience)])
.query(&[("audience", "https://www.googleapis.com/oauth2/v4/token")])
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
Expand All @@ -386,12 +356,9 @@ impl TokenProvider for InstanceCredentialProvider {
const METADATA_HOST: &str = "metadata";

info!("fetching token from metadata server");
let response =
make_metadata_request(client, METADATA_HOST, retry, &self.audience)
.or_else(|_| {
make_metadata_request(client, METADATA_IP, retry, &self.audience)
})
.await?;
let response = make_metadata_request(client, METADATA_HOST, retry)
.or_else(|_| make_metadata_request(client, METADATA_IP, retry))
.await?;
let token = TemporaryToken {
token: Arc::new(GcpCredential {
bearer: response.access_token,
Expand Down
22 changes: 10 additions & 12 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,15 @@ use crate::{
ObjectStore, Result, RetryConfig,
};

use credential::{
default_gcs_base_url, InstanceCredentialProvider, ServiceAccountCredentials,
};
use credential::{InstanceCredentialProvider, ServiceAccountCredentials};

mod credential;

const STORE: &str = "GCS";

/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
use crate::gcp::credential::{ApplicationDefaultCredentials, DEFAULT_AUDIENCE};
use crate::gcp::credential::{ApplicationDefaultCredentials, DEFAULT_GCS_BASE_URL};
pub use credential::GcpCredential;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -1052,10 +1050,10 @@ impl GoogleCloudStorageBuilder {
.map(|c| c.disable_oauth)
.unwrap_or(false);

let gcs_base_url = service_account_credentials
let gcs_base_url: String = service_account_credentials
.as_ref()
.map(|c| c.gcs_base_url.clone())
.unwrap_or_else(default_gcs_base_url);
.and_then(|c| c.gcs_base_url.clone())
.unwrap_or_else(|| DEFAULT_GCS_BASE_URL.to_string());

let credentials = if let Some(credentials) = self.credentials {
credentials
Expand All @@ -1065,7 +1063,7 @@ impl GoogleCloudStorageBuilder {
})) as _
} else if let Some(credentials) = service_account_credentials {
Arc::new(TokenCredentialProvider::new(
credentials.oauth_provider()?,
credentials.token_provider()?,
self.client_options.client()?,
self.retry_config.clone(),
)) as _
Expand All @@ -1080,15 +1078,15 @@ impl GoogleCloudStorageBuilder {
}
ApplicationDefaultCredentials::ServiceAccount(token) => {
Arc::new(TokenCredentialProvider::new(
token.oauth_provider()?,
token.token_provider()?,
self.client_options.client()?,
self.retry_config.clone(),
)) as _
}
}
} else {
Arc::new(TokenCredentialProvider::new(
InstanceCredentialProvider::new(DEFAULT_AUDIENCE),
InstanceCredentialProvider::default(),
self.client_options.clone().with_allow_http(true).client()?,
self.retry_config.clone(),
)) as _
Expand Down Expand Up @@ -1123,7 +1121,7 @@ mod test {

use super::*;

const FAKE_KEY: &str = r#"{"private_key": "private_key", "client_email":"client_email", "disable_oauth":true}"#;
const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id": "private_key_id", "client_email":"client_email", "disable_oauth":true}"#;
const NON_EXISTENT_NAME: &str = "nonexistentname";

#[tokio::test]
Expand All @@ -1135,7 +1133,7 @@ mod test {
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
if integration.client.base_url == default_gcs_base_url() {
if integration.client.base_url == DEFAULT_GCS_BASE_URL {
// Fake GCS server doesn't currently honor ifGenerationMatch
// https://github.com/fsouza/fake-gcs-server/issues/994
copy_if_not_exists(&integration).await;
Expand Down

0 comments on commit 405b787

Please sign in to comment.