diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 5d136757abfb6..b887357ad0822 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -39,6 +39,7 @@ use datafusion::common::Result; use datafusion::prelude::SessionContext; use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; use datafusion::common::{internal_err, DataFusionError, TableReference}; +use datafusion_common::internal_datafusion_err; #[tokio::main] async fn main() -> Result<()> { @@ -46,16 +47,20 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); // Here we would make the connection to the remove catalog and configure it - let remote_catalog = RemoteCatalogInterface::connect() + let remote_catalog_interface = RemoteCatalogInterface::connect() .await?; - // wrap it in an Arc so we can give DataFusion a reference for resolutio - let remote_catalog = Arc::new(remote_catalog); + // wrap it in an Arc so we can give DataFusion a reference for resolution + let remote_catalog_interface = Arc::new(remote_catalog_interface); + + // Register a DataFusion SchemaProvideer for "remote_schema", which + // DataFusion will resolve tables in that schema + // `datafusion.remote_schema.remote_table` + let remote_schema = Arc::new(RemoteSchema::new(Arc::clone(&remote_catalog_interface))); + - // register the "remote_schema" schema with the default provider - // so DataFusion will resolve tables like `datafusion.remote_schema.remote_table` ctx.catalog("datafusion") - .ok_or_else(||internal_err!("default catalog was not installed"))? - .register_schema(Arc::clone(&remote_catalog))?; + .ok_or_else(||internal_datafusion_err!("default catalog was not installed"))? + .register_schema("remote_schema", remote_schema)?; let sql = "SELECT * from remote_schema.remote_table"; let results = ctx.sql(sql).await.unwrap_err(); @@ -75,8 +80,7 @@ async fn main() -> Result<()> { // tables. Fetching the information for all tables once per query is often // more efficient than fetching the information on demand when the planner // needs the information - remote_catalog.(&table_ref).await?; - }); + remote_catalog_interface.load_tables(references).await?; // Now plan the query after having fetched the remote table information let plan = state.statement_to_plan(statement).await?; @@ -107,13 +111,14 @@ impl RemoteCatalogInterface { pub async fn connect() -> Result { // In a real implementation this method might connect to a remote catalog, // validate credentials, cache information, etc - Ok(()) + Ok(Self {}) } /// Load information for the specified tables from the remote /// source - pub async fn load_tables(references: Vec { - todo!() + pub async fn load_tables(&self, references: Vec) -> Result<()>{ + todo!(); + Ok(()) } } @@ -128,9 +133,19 @@ struct RemoteTableInfo { /// #[derive(Debug)] struct RemoteSchema { - remote_catalog: Arc, + remote_catalog_interface: Arc, + /// Tables that have been preloaded } +impl RemoteSchema { + pub fn new(remote_catalog_interface: Arc) -> Self { + Self { + remote_catalog_interface + } + } +} + + #[async_trait] impl SchemaProvider for RemoteSchema { fn as_any(&self) -> &dyn Any {