Skip to content

Commit

Permalink
GQL: Datasets: auth checks
Browse files Browse the repository at this point in the history
  • Loading branch information
s373r committed Dec 26, 2024
1 parent e5bfda0 commit bae7eda
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 70 deletions.
5 changes: 4 additions & 1 deletion src/adapter/graphql/src/queries/accounts/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ impl Account {
alias: &odf::DatasetAlias,
) -> Result<Option<Self>, InternalError> {
if alias.is_multi_tenant() {
Ok(Self::from_account_name(ctx, alias.account_name.as_ref().unwrap().clone()).await?)
// Safety: In multi-tenant, we have a name.
let account_name = alias.account_name.as_ref().unwrap().clone();

Ok(Self::from_account_name(ctx, account_name).await?)
} else {
let current_account_subject = from_catalog_n!(ctx, CurrentAccountSubject);

Expand Down
4 changes: 2 additions & 2 deletions src/adapter/graphql/src/queries/accounts/account_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl AccountFlowRuns {
by_flow_status: filters.by_status.map(Into::into),
by_dataset_ids: filters
.by_dataset_ids
.iter()
.map(|dataset_id| dataset_id.clone().into())
.into_iter()
.map(|dataset_id| dataset_id.into())

Check failure on line 58 in src/adapter/graphql/src/queries/accounts/account_flow_runs.rs

View workflow job for this annotation

GitHub Actions / Lint / Code

redundant closure
.collect::<HashSet<_>>(),
by_initiator: match filters.by_initiator {
Some(initiator_filter) => match initiator_filter {
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/graphql/src/queries/admin/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use crate::prelude::*;
use crate::AdminGuard;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct Admin;

#[Object]
Expand All @@ -20,3 +22,5 @@ impl Admin {
Ok("OK".to_string())
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
22 changes: 13 additions & 9 deletions src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use opendatafabric as odf;
use crate::prelude::*;
use crate::queries::*;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct DatasetEndpoints<'a> {
owner: &'a Account,
dataset_handle: odf::DatasetHandle,
Expand Down Expand Up @@ -48,7 +50,7 @@ impl<'a> DatasetEndpoints<'a> {
self.dataset_handle.alias.dataset_name.as_str()
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn web_link(&self) -> Result<LinkProtocolDesc> {
let url = format!(
"{}{}/{}",
Expand All @@ -60,7 +62,7 @@ impl<'a> DatasetEndpoints<'a> {
Ok(LinkProtocolDesc { url })
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn cli(&self) -> Result<CliProtocolDesc> {
let url = format!(
"odf+{}{}",
Expand All @@ -78,7 +80,7 @@ impl<'a> DatasetEndpoints<'a> {
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn rest(&self) -> Result<RestProtocolDesc> {
let dataset_base_url = format!(
"{}{}",
Expand All @@ -101,14 +103,14 @@ impl<'a> DatasetEndpoints<'a> {
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn flightsql(&self) -> Result<FlightSqlDesc> {
Ok(FlightSqlDesc {
url: self.config.protocols.base_url_flightsql.to_string(),
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn jdbc(&self) -> Result<JdbcDesc> {
let mut url = self.config.protocols.base_url_flightsql.clone();

Expand All @@ -119,28 +121,28 @@ impl<'a> DatasetEndpoints<'a> {
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn postgresql(&self) -> Result<PostgreSqlDesl> {
Ok(PostgreSqlDesl {
url: "- coming soon -".to_string(),
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn kafka(&self) -> Result<KafkaProtocolDesc> {
Ok(KafkaProtocolDesc {
url: "- coming soon -".to_string(),
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn websocket(&self) -> Result<WebSocketProtocolDesc> {
Ok(WebSocketProtocolDesc {
url: "- coming soon -".to_string(),
})
}

#[allow(clippy::unused_async)]
#[expect(clippy::unused_async)]
async fn odata(&self) -> Result<OdataProtocolDesc> {
let mut url = format!("{}odata", self.config.protocols.base_url_rest);
// to respect both kinds of workspaces: single-tenant & multi-tenant
Expand Down Expand Up @@ -168,3 +170,5 @@ impl<'a> DatasetEndpoints<'a> {
})
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl ViewDatasetEnvVar {
self.env_var.key.clone()
}

/// Non sercret value of dataset environment variable
/// Non secret value of dataset environment variable
#[allow(clippy::unused_async)]
async fn value(&self) -> Option<String> {
self.env_var.get_non_secret_value()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use kamu_flow_system::{FlowConfigurationService, FlowKeyDataset};
use opendatafabric as odf;

use crate::prelude::*;
use crate::utils::check_dataset_read_access;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -32,8 +31,6 @@ impl DatasetFlowConfigs {
ctx: &Context<'_>,
dataset_flow_type: DatasetFlowType,
) -> Result<Option<FlowConfiguration>> {
check_dataset_read_access(ctx, &self.dataset_handle).await?;

let flow_config_service = from_catalog_n!(ctx, dyn FlowConfigurationService);
let maybe_flow_config = flow_config_service
.find_configuration(
Expand All @@ -48,8 +45,6 @@ impl DatasetFlowConfigs {

/// Checks if all configs of this dataset are disabled
async fn all_paused(&self, ctx: &Context<'_>) -> Result<bool> {
check_dataset_read_access(ctx, &self.dataset_handle).await?;

let flow_config_service = from_catalog_n!(ctx, dyn FlowConfigurationService);
for dataset_flow_type in kamu_flow_system::DatasetFlowType::all() {
let maybe_flow_config = flow_config_service
Expand Down
129 changes: 77 additions & 52 deletions src/adapter/graphql/src/queries/datasets/datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::TryStreamExt;
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer};
use kamu_core::{
DatasetRegistryExt,
{self as domain},
Expand All @@ -16,6 +16,7 @@ use opendatafabric as odf;

use crate::prelude::*;
use crate::queries::*;
use crate::utils::check_dataset_read_access;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -24,22 +25,39 @@ pub struct Datasets;
#[Object]
impl Datasets {
const DEFAULT_PER_PAGE: usize = 15;
const DATASETS_CHUNK_SIZE: usize = 100;

/// Returns dataset by its ID
async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result<Option<Dataset>> {
#[graphql(skip)]
async fn by_dataset_ref(
&self,
ctx: &Context<'_>,
dataset_ref: &odf::DatasetRef,
) -> Result<Option<Dataset>> {
let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry);
let hdl = dataset_registry
.try_resolve_dataset_handle_by_ref(&dataset_id.as_local_ref())
let maybe_handle = dataset_registry
.try_resolve_dataset_handle_by_ref(dataset_ref)
.await?;
Ok(match hdl {
Some(h) => {
let account = Account::from_dataset_alias(ctx, &h.alias)
.await?
.expect("Account must exist");
Some(Dataset::new(account, h))
}
None => None,
})

let Some(handle) = maybe_handle else {
return Ok(None);
};

if let Err(_) = check_dataset_read_access(ctx, &handle).await {

Check failure on line 45 in src/adapter/graphql/src/queries/datasets/datasets.rs

View workflow job for this annotation

GitHub Actions / Lint / Code

redundant pattern matching, consider using `is_err()`
return Ok(None);
}

let account = Account::from_dataset_alias(ctx, &handle.alias)
.await?
.expect("Account must exist");

Ok(Some(Dataset::new(account, handle)))
}

/// Returns dataset by its ID
async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result<Option<Dataset>> {
let dataset_id: odf::DatasetID = dataset_id.into();

self.by_dataset_ref(ctx, &dataset_id.into_local_ref()).await
}

/// Returns dataset by its owner and name
Expand All @@ -49,22 +67,10 @@ impl Datasets {
account_name: AccountName,
dataset_name: DatasetName,
) -> Result<Option<Dataset>> {
let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry);
let dataset_alias = odf::DatasetAlias::new(Some(account_name.into()), dataset_name.into());
let hdl = dataset_registry
.try_resolve_dataset_handle_by_ref(&dataset_alias.into_local_ref())
.await?;

Ok(match hdl {
Some(h) => {
let account = Account::from_dataset_alias(ctx, &h.alias)
.await?
.expect("Account must exist");

Some(Dataset::new(account, h))
}
None => None,
})
self.by_dataset_ref(ctx, &dataset_alias.into_local_ref())
.await
}

#[graphql(skip)]
Expand All @@ -75,25 +81,47 @@ impl Datasets {
page: Option<usize>,
per_page: Option<usize>,
) -> Result<DatasetConnection> {
let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry);
let (dataset_registry, dataset_action_authorizer) = from_catalog_n!(
ctx,
dyn domain::DatasetRegistry,
dyn DatasetActionAuthorizer
);

let page = page.unwrap_or(0);
let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE);

let account_name = account_ref.account_name_internal();

let mut all_datasets: Vec<_> = dataset_registry
use futures::TryStreamExt;

let mut account_owned_datasets_stream = dataset_registry
.all_dataset_handles_by_owner(&account_name.clone().into())
.try_collect()
.await?;
let total_count = all_datasets.len();
all_datasets.sort_by(|a, b| a.alias.cmp(&b.alias));
.try_chunks(Self::DATASETS_CHUNK_SIZE);
let mut accessible_datasets_handles = Vec::new();

while let Some(account_owned_dataset_handles_chunk) =
account_owned_datasets_stream.try_next().await.int_err()?
{
let authorized_handles = dataset_action_authorizer
.classify_datasets_by_allowance(
account_owned_dataset_handles_chunk,
DatasetAction::Read,
)
.await?
.authorized_handles;

accessible_datasets_handles.extend(authorized_handles);
}

let nodes = all_datasets
let total_count = accessible_datasets_handles.len();

accessible_datasets_handles.sort_by(|a, b| a.alias.cmp(&b.alias));

let nodes = accessible_datasets_handles
.into_iter()
.skip(page * per_page)
.take(per_page)
.map(|hdl| Dataset::new(account_ref.clone(), hdl))
.map(|handle| Dataset::new(account_ref.clone(), handle))
.collect();

Ok(DatasetConnection::new(nodes, page, per_page, total_count))
Expand All @@ -114,22 +142,19 @@ impl Datasets {
.find_account_name_by_id(&account_id)
.await?;

match maybe_account_name {
Some(account_name) => {
self.by_account_impl(
ctx,
Account::new(account_id.into(), account_name.into()),
page,
per_page,
)
.await
}
None => {
let page = page.unwrap_or(0);
let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE);

Ok(DatasetConnection::new(vec![], page, per_page, 0))
}
if let Some(account_name) = maybe_account_name {
self.by_account_impl(
ctx,
Account::new(account_id.into(), account_name.into()),
page,
per_page,
)
.await
} else {
let page = page.unwrap_or(0);
let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE);

Ok(DatasetConnection::new(vec![], page, per_page, 0))
}
}

Expand Down

0 comments on commit bae7eda

Please sign in to comment.