Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 10, 2024
1 parent b59926d commit 13d698e
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions datafusion-examples/examples/remote_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,28 @@ use datafusion::common::Result;
use datafusion::prelude::SessionContext;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use datafusion::common::{internal_err, DataFusionError, TableReference};
use datafusion_common::internal_datafusion_err;

#[tokio::main]
async fn main() -> Result<()> {
// We create a session context to interact
let ctx = SessionContext::new();

// Here we would make the connection to the remove catalog and configure it
let remote_catalog = RemoteCatalogInterface::connect()
let remote_catalog_interface = RemoteCatalogInterface::connect()
.await?;
// wrap it in an Arc so we can give DataFusion a reference for resolutio
let remote_catalog = Arc::new(remote_catalog);
// wrap it in an Arc so we can give DataFusion a reference for resolution
let remote_catalog_interface = Arc::new(remote_catalog_interface);

// Register a DataFusion SchemaProvideer for "remote_schema", which
// DataFusion will resolve tables in that schema
// `datafusion.remote_schema.remote_table`
let remote_schema = Arc::new(RemoteSchema::new(Arc::clone(&remote_catalog_interface)));


// register the "remote_schema" schema with the default provider
// so DataFusion will resolve tables like `datafusion.remote_schema.remote_table`
ctx.catalog("datafusion")
.ok_or_else(||internal_err!("default catalog was not installed"))?
.register_schema(Arc::clone(&remote_catalog))?;
.ok_or_else(||internal_datafusion_err!("default catalog was not installed"))?
.register_schema("remote_schema", remote_schema)?;

let sql = "SELECT * from remote_schema.remote_table";
let results = ctx.sql(sql).await.unwrap_err();
Expand All @@ -75,8 +80,7 @@ async fn main() -> Result<()> {
// tables. Fetching the information for all tables once per query is often
// more efficient than fetching the information on demand when the planner
// needs the information
remote_catalog.(&table_ref).await?;
});
remote_catalog_interface.load_tables(references).await?;

// Now plan the query after having fetched the remote table information
let plan = state.statement_to_plan(statement).await?;
Expand Down Expand Up @@ -107,13 +111,14 @@ impl RemoteCatalogInterface {
pub async fn connect() -> Result<Self> {
// In a real implementation this method might connect to a remote catalog,
// validate credentials, cache information, etc
Ok(())
Ok(Self {})
}

/// Load information for the specified tables from the remote
/// source
pub async fn load_tables(references: Vec<TableReference> {
todo!()
pub async fn load_tables(&self, references: Vec<TableReference>) -> Result<()>{
todo!();
Ok(())
}

}
Expand All @@ -128,9 +133,19 @@ struct RemoteTableInfo {
///
#[derive(Debug)]
struct RemoteSchema {
remote_catalog: Arc<RemoteCatalogInterface>,
remote_catalog_interface: Arc<RemoteCatalogInterface>,
/// Tables that have been preloaded
}

impl RemoteSchema {
pub fn new(remote_catalog_interface: Arc<RemoteCatalogInterface>) -> Self {
Self {
remote_catalog_interface
}
}
}


#[async_trait]
impl SchemaProvider for RemoteSchema {
fn as_any(&self) -> &dyn Any {
Expand Down

0 comments on commit 13d698e

Please sign in to comment.