From 7ab977ff4ba011f10dcf0c853fdd8822cfa9bf1e Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 15 Sep 2023 15:09:51 +0300 Subject: [PATCH] feat(native): Cube.py - support orchestratorOptions/http/jwt (#7135) --- packages/cubejs-api-gateway/src/gateway.ts | 2 +- packages/cubejs-api-gateway/src/types/auth.ts | 2 +- .../python/cube/src/conf/__init__.py | 21 ++++-- .../cubejs-backend-native/src/python/cross.rs | 4 ++ .../src/python/cube_config.rs | 66 ++++++------------- .../cubejs-backend-native/src/python/entry.rs | 2 +- .../src/core/optionsValidate.ts | 15 ++++- .../cubejs-server-core/src/core/server.ts | 2 +- packages/cubejs-server-core/src/core/types.ts | 2 +- packages/cubejs-server/src/server.ts | 1 + 10 files changed, 59 insertions(+), 58 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 2aae848d78b96..fa7ea741c4370 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -2123,7 +2123,7 @@ class ApiGateway { } const jwk = await jwks.getJWKbyKid( - typeof options.jwkUrl === 'function' ? options.jwkUrl(decoded) : options.jwkUrl, + typeof options.jwkUrl === 'function' ? await options.jwkUrl(decoded) : options.jwkUrl, decoded.header.kid ); if (!jwk) { diff --git a/packages/cubejs-api-gateway/src/types/auth.ts b/packages/cubejs-api-gateway/src/types/auth.ts index fa17b5f89299c..6fffa1c917b95 100644 --- a/packages/cubejs-api-gateway/src/types/auth.ts +++ b/packages/cubejs-api-gateway/src/types/auth.ts @@ -27,7 +27,7 @@ interface JWTOptions { // JWK options jwkRetry?: number, jwkDefaultExpire?: number, - jwkUrl?: ((payload: any) => string) | string, + jwkUrl?: ((payload: any) => string | Promise) | string, jwkRefetchWindow?: number, // JWT options diff --git a/packages/cubejs-backend-native/python/cube/src/conf/__init__.py b/packages/cubejs-backend-native/python/cube/src/conf/__init__.py index eef4fd3e7dbbf..7a3a0a2833531 100644 --- a/packages/cubejs-backend-native/python/cube/src/conf/__init__.py +++ b/packages/cubejs-backend-native/python/cube/src/conf/__init__.py @@ -40,6 +40,8 @@ class Configuration: cache_and_queue_driver: str allow_js_duplicate_props_in_schema: bool process_subscriptions_interval: int + http: Dict + jwt: Dict # Functions logger: Callable context_to_app_id: Union[str, Callable[[RequestContext], str]] @@ -53,9 +55,10 @@ class Configuration: scheduled_refresh_contexts: Callable context_to_api_scopes: Callable repository_factory: Callable - schema_version: Callable[[RequestContext], str] - semantic_layer_sync: Callable - pre_aggregations_schema: Callable[[RequestContext], str] + schema_version: Union[str, Callable[[RequestContext], str]] + semantic_layer_sync: Union[Dict, Callable[[], Dict]] + pre_aggregations_schema: Union[Callable[[RequestContext], str]] + orchestrator_options: Union[Dict, Callable[[RequestContext], Dict]] def __init__(self): self.schema_path = None @@ -67,6 +70,8 @@ def __init__(self): self.cache_and_queue_driver = None self.allow_js_duplicate_props_in_schema = None self.process_subscriptions_interval = None + self.http = None + self.jwt = None # Functions self.logger = None self.context_to_app_id = None @@ -84,6 +89,7 @@ def __init__(self): self.schema_version = None self.semantic_layer_sync = None self.pre_aggregations_schema = None + self.orchestrator_options = None def set_schema_path(self, schema_path: str): self.schema_path = schema_path @@ -148,15 +154,18 @@ def set_scheduled_refresh_contexts(self, scheduled_refresh_contexts: Callable): def set_repository_factory(self, repository_factory: Callable): self.repository_factory = repository_factory - def set_schema_version(self, schema_version: Callable[[RequestContext], str]): + def set_schema_version(self, schema_version: Union[str, Callable[[RequestContext], str]]): self.schema_version = schema_version - def set_semantic_layer_sync(self, semantic_layer_sync: Callable): + def set_semantic_layer_sync(self, semantic_layer_sync: Union[Dict, Callable[[], Dict]]): self.semantic_layer_sync = semantic_layer_sync - def set_pre_aggregations_schema(self, pre_aggregations_schema: Callable[[RequestContext], str]): + def set_pre_aggregations_schema(self, pre_aggregations_schema: Union[str, Callable[[RequestContext], str]]): self.pre_aggregations_schema = pre_aggregations_schema + def set_orchestrator_options(self, orchestrator_options: Union[Dict, Callable[[RequestContext], Dict]]): + self.orchestrator_options = orchestrator_options + settings = Configuration() diff --git a/packages/cubejs-backend-native/src/python/cross.rs b/packages/cubejs-backend-native/src/python/cross.rs index d87ca714bda3c..b9e54c5a62307 100644 --- a/packages/cubejs-backend-native/src/python/cross.rs +++ b/packages/cubejs-backend-native/src/python/cross.rs @@ -286,6 +286,10 @@ impl CLRepr { } Self::Tuple(r) + } else if v.get_type().is_subclass_of::()? { + let fun: Py = v.downcast::()?.into(); + + Self::PyFunction(fun) } else { return Err(PyErr::new::(format!( "Unable to represent {} type as CLR from Python", diff --git a/packages/cubejs-backend-native/src/python/cube_config.rs b/packages/cubejs-backend-native/src/python/cube_config.rs index ba540213e64df..3a7aa423e18d7 100644 --- a/packages/cubejs-backend-native/src/python/cube_config.rs +++ b/packages/cubejs-backend-native/src/python/cube_config.rs @@ -1,8 +1,6 @@ use convert_case::{Case, Casing}; use neon::prelude::*; -use pyo3::exceptions::PyTypeError; -use pyo3::types::PyFunction; -use pyo3::{Py, PyAny, PyErr, PyResult}; +use pyo3::{PyAny, PyResult}; use crate::python::cross::{CLRepr, CLReprObject}; @@ -28,56 +26,34 @@ impl CubeConfigPy { "cache_and_queue_driver", "allow_js_duplicate_props_in_schema", "process_subscriptions_interval", + "http", + "jwt", ] } pub fn apply_dynamic_functions(&mut self, config_module: &PyAny) -> PyResult<()> { - self.function_attr(config_module, "logger")?; - self.function_attr(config_module, "context_to_app_id")?; - self.function_attr(config_module, "context_to_orchestrator_id")?; - self.function_attr(config_module, "driver_factory")?; - self.function_attr(config_module, "db_type")?; - self.function_attr(config_module, "check_auth")?; - self.function_attr(config_module, "check_sql_auth")?; - self.function_attr(config_module, "can_switch_sql_user")?; - self.function_attr(config_module, "query_rewrite")?; - self.function_attr(config_module, "extend_context")?; - self.function_attr(config_module, "scheduled_refresh_contexts")?; - self.function_attr(config_module, "context_to_api_scopes")?; - self.function_attr(config_module, "repository_factory")?; - self.function_attr(config_module, "semantic_layer_sync")?; - self.function_attr(config_module, "schema_version")?; - self.function_attr(config_module, "pre_aggregations_schema")?; + self.attr(config_module, "logger")?; + self.attr(config_module, "context_to_app_id")?; + self.attr(config_module, "context_to_orchestrator_id")?; + self.attr(config_module, "driver_factory")?; + self.attr(config_module, "db_type")?; + self.attr(config_module, "check_auth")?; + self.attr(config_module, "check_sql_auth")?; + self.attr(config_module, "can_switch_sql_user")?; + self.attr(config_module, "query_rewrite")?; + self.attr(config_module, "extend_context")?; + self.attr(config_module, "scheduled_refresh_contexts")?; + self.attr(config_module, "context_to_api_scopes")?; + self.attr(config_module, "repository_factory")?; + self.attr(config_module, "semantic_layer_sync")?; + self.attr(config_module, "schema_version")?; + self.attr(config_module, "pre_aggregations_schema")?; + self.attr(config_module, "orchestrator_options")?; Ok(()) } - pub fn function_attr<'a>( - &mut self, - config_module: &'a PyAny, - key: &str, - ) -> PyResult>> { - let v = config_module.getattr(&*key)?; - if !v.is_none() { - if v.get_type().is_subclass_of::()? { - let cb = v.downcast::()?; - let py: Py = cb.into(); - - let value = CLRepr::PyFunction(py); - self.properties.insert(key.to_case(Case::Camel), value); - } else { - return Err(PyErr::new::(format!( - "Unsupported configuration type: {} for key: {}, must be a lambda", - v.get_type(), - key - ))); - } - } - - Ok(None) - } - - pub fn static_attr(&mut self, config_module: &PyAny, key: &str) -> PyResult<()> { + pub fn attr(&mut self, config_module: &PyAny, key: &str) -> PyResult<()> { let v = config_module.getattr(&*key)?; if !v.is_none() { let value = CLRepr::from_python_ref(v)?; diff --git a/packages/cubejs-backend-native/src/python/entry.rs b/packages/cubejs-backend-native/src/python/entry.rs index d067ae5254e87..f43b9a818f33e 100644 --- a/packages/cubejs-backend-native/src/python/entry.rs +++ b/packages/cubejs-backend-native/src/python/entry.rs @@ -37,7 +37,7 @@ fn python_load_config(mut cx: FunctionContext) -> JsResult { let mut cube_conf = CubeConfigPy::new(); for attr_name in cube_conf.get_static_attrs() { - cube_conf.static_attr(settings_py, attr_name)?; + cube_conf.attr(settings_py, attr_name)?; } cube_conf.apply_dynamic_functions(settings_py)?; diff --git a/packages/cubejs-server-core/src/core/optionsValidate.ts b/packages/cubejs-server-core/src/core/optionsValidate.ts index 4cb34c8c742d6..d4847604d720e 100644 --- a/packages/cubejs-server-core/src/core/optionsValidate.ts +++ b/packages/cubejs-server-core/src/core/optionsValidate.ts @@ -29,6 +29,17 @@ const jwtOptions = Joi.object().strict(true).keys({ claimsNamespace: Joi.string(), }); +const corsOptions = Joi.object().strict(true).keys({ + origin: Joi.any(), + methods: Joi.any(), + allowedHeaders: Joi.any(), + exposedHeaders: Joi.any(), + credentials: Joi.bool(), + maxAge: Joi.number(), + preflightContinue: Joi.bool(), + optionsSuccessStatus: Joi.bool(), +}); + const dbTypes = Joi.alternatives().try( Joi.string().valid(...Object.keys(DriverDependencies)), Joi.func() @@ -38,8 +49,8 @@ const schemaOptions = Joi.object().keys({ // server CreateOptions initApp: Joi.func(), webSockets: Joi.boolean(), - http: Joi.object().keys({ - cors: Joi.object(), + http: Joi.object().strict(true).keys({ + cors: corsOptions, }), gracefulShutdown: Joi.number().min(0).integer(), // Additional from WebSocketServerOptions diff --git a/packages/cubejs-server-core/src/core/server.ts b/packages/cubejs-server-core/src/core/server.ts index 34d2af82101c9..537bd775f1383 100644 --- a/packages/cubejs-server-core/src/core/server.ts +++ b/packages/cubejs-server-core/src/core/server.ts @@ -559,7 +559,7 @@ export class CubejsServerCore { const orchestratorOptions = this.optsHandler.getOrchestratorInitializedOptions( context, - this.orchestratorOptions(context) || {}, + (await this.orchestratorOptions(context)) || {}, ); const orchestratorApi = this.createOrchestratorApi( diff --git a/packages/cubejs-server-core/src/core/types.ts b/packages/cubejs-server-core/src/core/types.ts index 54aef637cc607..0f83a34f7f07e 100644 --- a/packages/cubejs-server-core/src/core/types.ts +++ b/packages/cubejs-server-core/src/core/types.ts @@ -125,7 +125,7 @@ export type DatabaseType = export type ContextToAppIdFn = (context: RequestContext) => string | Promise; export type ContextToOrchestratorIdFn = (context: RequestContext) => string | Promise; -export type OrchestratorOptionsFn = (context: RequestContext) => OrchestratorOptions; +export type OrchestratorOptionsFn = (context: RequestContext) => OrchestratorOptions | Promise; export type PreAggregationsSchemaFn = (context: RequestContext) => string | Promise; diff --git a/packages/cubejs-server/src/server.ts b/packages/cubejs-server/src/server.ts index 13b2a0fe33465..8c3006dd36aa5 100644 --- a/packages/cubejs-server/src/server.ts +++ b/packages/cubejs-server/src/server.ts @@ -61,6 +61,7 @@ export class CubejsServer { protected readonly status: ServerStatusHandler = new ServerStatusHandler(); public constructor(config: CreateOptions = {}, systemOptions?: SystemOptions) { + console.log(config); this.config = { ...config, webSockets: config.webSockets || getEnv('webSockets'),