From e4aacd86521c2bbbd666127470f3f7df07d44cca Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Nov 2024 10:33:45 -0500 Subject: [PATCH] add example for http access --- datafusion-examples/examples/thread_pools.rs | 52 +++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index 66c85d80fea6b..b9dc3e059992c 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -25,13 +25,15 @@ //! due to congestion control and increased latencies for processing network //! messages. +use std::sync::Arc; use arrow::util::pretty::pretty_format_batches; -use datafusion::common::runtime::dedicated_executor::DedicatedExecutor; use datafusion::common::runtime::DedicatedExecutorBuilder; use datafusion::error::Result; use datafusion::execution::SendableRecordBatchStream; use datafusion::prelude::*; use futures::stream::StreamExt; +use object_store::http::HttpBuilder; +use url::Url; /// Normally, you don't need to worry about the details of the tokio runtime, /// but for this example it is important to understand how the [`Runtime`]s work. @@ -44,10 +46,9 @@ use futures::stream::StreamExt; /// are run). #[tokio::main] async fn main() -> Result<()> { - let ctx = SessionContext::new() - // enabling URL table means we can select directly from - // paths in SQL queries. - .enable_url_table(); + // The first two examples only do local file IO. Enable the URL table so we + // can select directly from filenames in SQL. + let ctx = SessionContext::new().enable_url_table(); let sql = format!( "SELECT * FROM '{}/alltypes_plain.parquet'", datafusion::test_util::parquet_test_data() @@ -64,7 +65,7 @@ async fn main() -> Result<()> { different_runtime_basic(ctx, sql).await?; // Run the same query on a different runtime including remote IO - different_runtime_advanced(ctx, sql).await?; + different_runtime_advanced().await?; Ok(()) } @@ -140,13 +141,40 @@ async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> Ok(()) } -/// Demonstrates how to run queries on a **different** runtime than the current one but -/// run IO operations on the current runtime +/// Demonstrates how to run queries on a different runtime than the current run +/// and how to handle IO operations. +/// -/// this is run on a new, separate runtime -async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> { - // - // You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you. +async fn different_runtime_advanced() -> Result<()> { + // In this example, we will configure access to a remote object store + // over the network during the plan + + let ctx = SessionContext::new() + .enable_url_table(); + + // setup http object store + let base_url = Url::parse("https://github.com").unwrap(); + let http_store = HttpBuilder::new() + .with_url(base_url.clone()) + .build() + .unwrap(); + ctx.register_object_store(&base_url, Arc::new(http_store)); + + // register csv file with the execution context + ctx.register_csv( + "aggregate_test_100", + "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv", + CsvReadOptions::new(), + ) + .await?; + + // execute the query + let df = ctx + .sql("SELECT c1,c2,c3 FROM aggregate_test_100 LIMIT 5") + .await?; + + // print the results + df.show().await?; // Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.