Skip to content

Commit

Permalink
Fix reading multiple complex data types exception
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue committed Dec 28, 2024
1 parent 4fd3453 commit b68bb5b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
10 changes: 10 additions & 0 deletions examples/datafusion_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ async fn main() -> Result<()> {
// methods available on SessionContext. With that done, we are able to process
// ORC files using SQL or the DataFrame API.
let ctx = SessionContext::new();

ctx.register_orc(
"table2",
"tests/basic/data/map_list.snappy.orc",
OrcReadOptions::default(),
)
.await?;

ctx.sql("select id,m,l from table2").await?.show().await?;

ctx.register_orc(
"table1",
"tests/basic/data/alltypes.snappy.orc",
Expand Down
17 changes: 13 additions & 4 deletions src/physical_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl ExecutionPlan for OrcExec {
projection,
batch_size: context.session_config().batch_size(),
_limit: self.config.limit,
_table_schema: self.config.file_schema.clone(),
table_schema: self.config.file_schema.clone(),
_metrics: self.metrics.clone(),
object_store,
};
Expand All @@ -142,7 +142,7 @@ struct OrcOpener {
projection: Vec<usize>,
batch_size: usize,
_limit: Option<usize>,
_table_schema: SchemaRef,
table_schema: SchemaRef,
_metrics: ExecutionPlanMetricsSet,
object_store: Arc<dyn ObjectStore>,
}
Expand All @@ -152,12 +152,21 @@ impl FileOpener for OrcOpener {
let reader =
ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone());
let batch_size = self.batch_size;
// Offset by 1 since index 0 is the root
let projection = self.projection.iter().map(|i| i + 1).collect::<Vec<_>>();
let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?);

Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
.await
.map_err(ArrowError::from)?;
// Find complex data type column index as projection
let mut projection = Vec::with_capacity(projected_schema.fields().len());
for named_column in builder.file_metadata().root_data_type().children() {
if let Some((_table_idx, _table_field)) =
projected_schema.fields().find(named_column.name())
{
projection.push(named_column.data_type().column_index());
}
}
let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
if let Some(range) = file_meta.range.clone() {
Expand Down
Binary file added tests/basic/data/map_list.snappy.orc
Binary file not shown.

0 comments on commit b68bb5b

Please sign in to comment.