Skip to content

Commit

Permalink
add example for http access
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 20, 2024
1 parent cccecf1 commit e4aacd8
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
//! due to congestion control and increased latencies for processing network
//! messages.
use std::sync::Arc;
use arrow::util::pretty::pretty_format_batches;
use datafusion::common::runtime::dedicated_executor::DedicatedExecutor;
use datafusion::common::runtime::DedicatedExecutorBuilder;
use datafusion::error::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::prelude::*;
use futures::stream::StreamExt;
use object_store::http::HttpBuilder;
use url::Url;

/// Normally, you don't need to worry about the details of the tokio runtime,
/// but for this example it is important to understand how the [`Runtime`]s work.
Expand All @@ -44,10 +46,9 @@ use futures::stream::StreamExt;
/// are run).
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new()
// enabling URL table means we can select directly from
// paths in SQL queries.
.enable_url_table();
// The first two examples only do local file IO. Enable the URL table so we
// can select directly from filenames in SQL.
let ctx = SessionContext::new().enable_url_table();
let sql = format!(
"SELECT * FROM '{}/alltypes_plain.parquet'",
datafusion::test_util::parquet_test_data()
Expand All @@ -64,7 +65,7 @@ async fn main() -> Result<()> {
different_runtime_basic(ctx, sql).await?;

// Run the same query on a different runtime including remote IO
different_runtime_advanced(ctx, sql).await?;
different_runtime_advanced().await?;

Ok(())
}
Expand Down Expand Up @@ -140,13 +141,40 @@ async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()>
Ok(())
}

/// Demonstrates how to run queries on a **different** runtime than the current one but
/// run IO operations on the current runtime
/// Demonstrates how to run queries on a different runtime than the current run
/// and how to handle IO operations.
///
/// this is run on a new, separate runtime
async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> {
//
// You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you.
async fn different_runtime_advanced() -> Result<()> {
// In this example, we will configure access to a remote object store
// over the network during the plan

let ctx = SessionContext::new()
.enable_url_table();

// setup http object store
let base_url = Url::parse("https://github.com").unwrap();
let http_store = HttpBuilder::new()
.with_url(base_url.clone())
.build()
.unwrap();
ctx.register_object_store(&base_url, Arc::new(http_store));

// register csv file with the execution context
ctx.register_csv(
"aggregate_test_100",
"https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv",
CsvReadOptions::new(),
)
.await?;

// execute the query
let df = ctx
.sql("SELECT c1,c2,c3 FROM aggregate_test_100 LIMIT 5")
.await?;

// print the results
df.show().await?;

// Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.

Expand Down

0 comments on commit e4aacd8

Please sign in to comment.