diff --git a/src/adapter/graphql/src/queries/accounts/account.rs b/src/adapter/graphql/src/queries/accounts/account.rs index 2b004625a..0225549a8 100644 --- a/src/adapter/graphql/src/queries/accounts/account.rs +++ b/src/adapter/graphql/src/queries/accounts/account.rs @@ -90,7 +90,10 @@ impl Account { alias: &odf::DatasetAlias, ) -> Result, InternalError> { if alias.is_multi_tenant() { - Ok(Self::from_account_name(ctx, alias.account_name.as_ref().unwrap().clone()).await?) + // Safety: In multi-tenant, we have a name. + let account_name = alias.account_name.as_ref().unwrap().clone(); + + Ok(Self::from_account_name(ctx, account_name).await?) } else { let current_account_subject = from_catalog_n!(ctx, CurrentAccountSubject); diff --git a/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs b/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs index 7fb5416e8..6a6cee5b2 100644 --- a/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs +++ b/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs @@ -54,8 +54,8 @@ impl AccountFlowRuns { by_flow_status: filters.by_status.map(Into::into), by_dataset_ids: filters .by_dataset_ids - .iter() - .map(|dataset_id| dataset_id.clone().into()) + .into_iter() + .map(|dataset_id| dataset_id.into()) .collect::>(), by_initiator: match filters.by_initiator { Some(initiator_filter) => match initiator_filter { diff --git a/src/adapter/graphql/src/queries/admin/admin.rs b/src/adapter/graphql/src/queries/admin/admin.rs index d72d79ef2..c846cd33b 100644 --- a/src/adapter/graphql/src/queries/admin/admin.rs +++ b/src/adapter/graphql/src/queries/admin/admin.rs @@ -10,6 +10,8 @@ use crate::prelude::*; use crate::AdminGuard; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct Admin; #[Object] @@ -20,3 +22,5 @@ impl Admin { Ok("OK".to_string()) } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs b/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs index 463f6fd53..43fc8696a 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs @@ -15,6 +15,8 @@ use opendatafabric as odf; use crate::prelude::*; use crate::queries::*; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct DatasetEndpoints<'a> { owner: &'a Account, dataset_handle: odf::DatasetHandle, @@ -48,7 +50,7 @@ impl<'a> DatasetEndpoints<'a> { self.dataset_handle.alias.dataset_name.as_str() } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn web_link(&self) -> Result { let url = format!( "{}{}/{}", @@ -60,7 +62,7 @@ impl<'a> DatasetEndpoints<'a> { Ok(LinkProtocolDesc { url }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn cli(&self) -> Result { let url = format!( "odf+{}{}", @@ -78,7 +80,7 @@ impl<'a> DatasetEndpoints<'a> { }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn rest(&self) -> Result { let dataset_base_url = format!( "{}{}", @@ -101,14 +103,14 @@ impl<'a> DatasetEndpoints<'a> { }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn flightsql(&self) -> Result { Ok(FlightSqlDesc { url: self.config.protocols.base_url_flightsql.to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn jdbc(&self) -> Result { let mut url = self.config.protocols.base_url_flightsql.clone(); @@ -119,28 +121,28 @@ impl<'a> DatasetEndpoints<'a> { }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn postgresql(&self) -> Result { Ok(PostgreSqlDesl { url: "- coming soon -".to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn kafka(&self) -> Result { Ok(KafkaProtocolDesc { url: "- coming soon -".to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn websocket(&self) -> Result { Ok(WebSocketProtocolDesc { url: "- coming soon -".to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn odata(&self) -> Result { let mut url = format!("{}odata", self.config.protocols.base_url_rest); // to respect both kinds of workspaces: single-tenant & multi-tenant @@ -168,3 +170,5 @@ impl<'a> DatasetEndpoints<'a> { }) } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs b/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs index 1bc95d867..fe9d559b8 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs @@ -36,7 +36,7 @@ impl ViewDatasetEnvVar { self.env_var.key.clone() } - /// Non sercret value of dataset environment variable + /// Non secret value of dataset environment variable #[allow(clippy::unused_async)] async fn value(&self) -> Option { self.env_var.get_non_secret_value() diff --git a/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs b/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs index a094fea26..863b35d9d 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs @@ -11,7 +11,6 @@ use kamu_flow_system::{FlowConfigurationService, FlowKeyDataset}; use opendatafabric as odf; use crate::prelude::*; -use crate::utils::check_dataset_read_access; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -32,8 +31,6 @@ impl DatasetFlowConfigs { ctx: &Context<'_>, dataset_flow_type: DatasetFlowType, ) -> Result> { - check_dataset_read_access(ctx, &self.dataset_handle).await?; - let flow_config_service = from_catalog_n!(ctx, dyn FlowConfigurationService); let maybe_flow_config = flow_config_service .find_configuration( @@ -48,8 +45,6 @@ impl DatasetFlowConfigs { /// Checks if all configs of this dataset are disabled async fn all_paused(&self, ctx: &Context<'_>) -> Result { - check_dataset_read_access(ctx, &self.dataset_handle).await?; - let flow_config_service = from_catalog_n!(ctx, dyn FlowConfigurationService); for dataset_flow_type in kamu_flow_system::DatasetFlowType::all() { let maybe_flow_config = flow_config_service diff --git a/src/adapter/graphql/src/queries/datasets/datasets.rs b/src/adapter/graphql/src/queries/datasets/datasets.rs index 546ccfa85..14fa2aa05 100644 --- a/src/adapter/graphql/src/queries/datasets/datasets.rs +++ b/src/adapter/graphql/src/queries/datasets/datasets.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use futures::TryStreamExt; +use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer}; use kamu_core::{ DatasetRegistryExt, {self as domain}, @@ -16,6 +16,7 @@ use opendatafabric as odf; use crate::prelude::*; use crate::queries::*; +use crate::utils::check_dataset_read_access; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -24,22 +25,39 @@ pub struct Datasets; #[Object] impl Datasets { const DEFAULT_PER_PAGE: usize = 15; + const DATASETS_CHUNK_SIZE: usize = 100; - /// Returns dataset by its ID - async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result> { + #[graphql(skip)] + async fn by_dataset_ref( + &self, + ctx: &Context<'_>, + dataset_ref: &odf::DatasetRef, + ) -> Result> { let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); - let hdl = dataset_registry - .try_resolve_dataset_handle_by_ref(&dataset_id.as_local_ref()) + let maybe_handle = dataset_registry + .try_resolve_dataset_handle_by_ref(dataset_ref) .await?; - Ok(match hdl { - Some(h) => { - let account = Account::from_dataset_alias(ctx, &h.alias) - .await? - .expect("Account must exist"); - Some(Dataset::new(account, h)) - } - None => None, - }) + + let Some(handle) = maybe_handle else { + return Ok(None); + }; + + if let Err(_) = check_dataset_read_access(ctx, &handle).await { + return Ok(None); + } + + let account = Account::from_dataset_alias(ctx, &handle.alias) + .await? + .expect("Account must exist"); + + Ok(Some(Dataset::new(account, handle))) + } + + /// Returns dataset by its ID + async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result> { + let dataset_id: odf::DatasetID = dataset_id.into(); + + self.by_dataset_ref(ctx, &dataset_id.into_local_ref()).await } /// Returns dataset by its owner and name @@ -49,22 +67,10 @@ impl Datasets { account_name: AccountName, dataset_name: DatasetName, ) -> Result> { - let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); let dataset_alias = odf::DatasetAlias::new(Some(account_name.into()), dataset_name.into()); - let hdl = dataset_registry - .try_resolve_dataset_handle_by_ref(&dataset_alias.into_local_ref()) - .await?; - - Ok(match hdl { - Some(h) => { - let account = Account::from_dataset_alias(ctx, &h.alias) - .await? - .expect("Account must exist"); - Some(Dataset::new(account, h)) - } - None => None, - }) + self.by_dataset_ref(ctx, &dataset_alias.into_local_ref()) + .await } #[graphql(skip)] @@ -75,25 +81,47 @@ impl Datasets { page: Option, per_page: Option, ) -> Result { - let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); + let (dataset_registry, dataset_action_authorizer) = from_catalog_n!( + ctx, + dyn domain::DatasetRegistry, + dyn DatasetActionAuthorizer + ); let page = page.unwrap_or(0); let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE); let account_name = account_ref.account_name_internal(); - let mut all_datasets: Vec<_> = dataset_registry + use futures::TryStreamExt; + + let mut account_owned_datasets_stream = dataset_registry .all_dataset_handles_by_owner(&account_name.clone().into()) - .try_collect() - .await?; - let total_count = all_datasets.len(); - all_datasets.sort_by(|a, b| a.alias.cmp(&b.alias)); + .try_chunks(Self::DATASETS_CHUNK_SIZE); + let mut accessible_datasets_handles = Vec::new(); + + while let Some(account_owned_dataset_handles_chunk) = + account_owned_datasets_stream.try_next().await.int_err()? + { + let authorized_handles = dataset_action_authorizer + .classify_datasets_by_allowance( + account_owned_dataset_handles_chunk, + DatasetAction::Read, + ) + .await? + .authorized_handles; + + accessible_datasets_handles.extend(authorized_handles); + } - let nodes = all_datasets + let total_count = accessible_datasets_handles.len(); + + accessible_datasets_handles.sort_by(|a, b| a.alias.cmp(&b.alias)); + + let nodes = accessible_datasets_handles .into_iter() .skip(page * per_page) .take(per_page) - .map(|hdl| Dataset::new(account_ref.clone(), hdl)) + .map(|handle| Dataset::new(account_ref.clone(), handle)) .collect(); Ok(DatasetConnection::new(nodes, page, per_page, total_count)) @@ -114,22 +142,19 @@ impl Datasets { .find_account_name_by_id(&account_id) .await?; - match maybe_account_name { - Some(account_name) => { - self.by_account_impl( - ctx, - Account::new(account_id.into(), account_name.into()), - page, - per_page, - ) - .await - } - None => { - let page = page.unwrap_or(0); - let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE); - - Ok(DatasetConnection::new(vec![], page, per_page, 0)) - } + if let Some(account_name) = maybe_account_name { + self.by_account_impl( + ctx, + Account::new(account_id.into(), account_name.into()), + page, + per_page, + ) + .await + } else { + let page = page.unwrap_or(0); + let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE); + + Ok(DatasetConnection::new(vec![], page, per_page, 0)) } }