Skip to content

Commit

Permalink
From impl OrcError to DataFusionError
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed May 12, 2024
1 parent 32629af commit 9104367
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
11 changes: 3 additions & 8 deletions file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;

use crate::reader::metadata::read_metadata_async;
use arrow::datatypes::Schema;
use arrow::error::ArrowError;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::{FileType, Statistics};
use datafusion::datasource::file_format::FileFormat;
Expand All @@ -18,20 +17,16 @@ use futures::TryStreamExt;

use async_trait::async_trait;
use futures_util::StreamExt;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};

use super::object_store_reader::ObjectStoreReader;
use super::physical_exec::OrcExec;

async fn fetch_schema(
store: &Arc<dyn ObjectStore>,
file: &ObjectMeta,
) -> Result<(object_store::path::Path, Schema)> {
async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result<(Path, Schema)> {
let loc_path = file.location.clone();
let mut reader = ObjectStoreReader::new(Arc::clone(store), file.clone());
let metadata = read_metadata_async(&mut reader)
.await
.map_err(ArrowError::from)?;
let metadata = read_metadata_async(&mut reader).await?;
let schema = metadata
.root_data_type()
.create_arrow_schema(&HashMap::default());
Expand Down
10 changes: 9 additions & 1 deletion mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::error::Result;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::config::SessionConfig;
use datafusion::execution::context::{DataFilePaths, SessionContext, SessionState};
use datafusion::execution::options::ReadOptions;

use async_trait::async_trait;

use crate::error::OrcError;

use self::file_format::OrcFormat;

mod file_format;
Expand Down Expand Up @@ -122,6 +124,12 @@ impl SessionContextOrcExt for SessionContext {
}
}

impl From<OrcError> for DataFusionError {
fn from(value: OrcError) -> Self {
DataFusionError::External(Box::new(value))
}
}

#[cfg(test)]
mod tests {
use datafusion::assert_batches_sorted_eq;
Expand Down
1 change: 1 addition & 0 deletions object_store_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};

/// Implements [`AsyncChunkReader`] to allow reading ORC files via `object_store` API.
pub struct ObjectStoreReader {
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
Expand Down

0 comments on commit 9104367

Please sign in to comment.