Skip to content

Commit

Permalink
Merge branch 'main' into pyo3-0.23
Browse files Browse the repository at this point in the history
  • Loading branch information
bschoenmaeckers committed Dec 4, 2024
2 parents 8aa59a8 + bcfa7ec commit db2410f
Show file tree
Hide file tree
Showing 104 changed files with 2,587 additions and 1,338 deletions.
109 changes: 107 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ serde_json = "1"
simd-json = { version = "0.14", features = ["known-key"] }
simdutf8 = "0.1.4"
slotmap = "1"
sqlparser = "0.49"
sqlparser = "0.52"
stacker = "0.1"
streaming-iterator = "0.1.9"
strength_reduce = "0.2"
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<R: Read + Seek + Write> FileWriter<R> {
cannot_replace: true,
},
encoded_message: Default::default(),
custom_schema_metadata: None,
})
}
}
8 changes: 6 additions & 2 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::common::*;
use super::schema::fb_to_schema;
use super::{Dictionaries, OutOfSpecKind, SendableIterator};
use crate::array::Array;
use crate::datatypes::ArrowSchemaRef;
use crate::datatypes::{ArrowSchemaRef, Metadata};
use crate::io::ipc::IpcSchema;
use crate::record_batch::RecordBatchT;

Expand All @@ -21,6 +21,9 @@ pub struct FileMetadata {
/// The schema that is read from the file footer
pub schema: ArrowSchemaRef,

/// The custom metadata that is read from the schema
pub custom_schema_metadata: Option<Arc<Metadata>>,

/// The files' [`IpcSchema`]
pub ipc_schema: IpcSchema,

Expand Down Expand Up @@ -245,14 +248,15 @@ pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMet
.map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
.transpose()?;
let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
let (schema, ipc_schema) = fb_to_schema(ipc_schema)?;
let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;

Ok(FileMetadata {
schema: Arc::new(schema),
ipc_schema,
blocks,
dictionaries,
size,
custom_schema_metadata: custom_schema_metadata.map(Arc::new),
})
}

Expand Down
30 changes: 27 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ fn get_dtype(
}

/// Deserialize an flatbuffers-encoded Schema message into [`ArrowSchema`] and [`IpcSchema`].
pub fn deserialize_schema(message: &[u8]) -> PolarsResult<(ArrowSchema, IpcSchema)> {
pub fn deserialize_schema(
message: &[u8],
) -> PolarsResult<(ArrowSchema, IpcSchema, Option<Metadata>)> {
let message = arrow_format::ipc::MessageRef::read_as_root(message)
.map_err(|_err| polars_err!(oos = "Unable deserialize message: {err:?}"))?;

Expand All @@ -374,7 +376,7 @@ pub fn deserialize_schema(message: &[u8]) -> PolarsResult<(ArrowSchema, IpcSchem
/// Deserialize the raw Schema table from IPC format to Schema data type
pub(super) fn fb_to_schema(
schema: arrow_format::ipc::SchemaRef,
) -> PolarsResult<(ArrowSchema, IpcSchema)> {
) -> PolarsResult<(ArrowSchema, IpcSchema, Option<Metadata>)> {
let fields = schema
.fields()?
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingFields))?;
Expand All @@ -393,12 +395,33 @@ pub(super) fn fb_to_schema(
arrow_format::ipc::Endianness::Big => false,
};

let custom_schema_metadata = match schema.custom_metadata()? {
None => None,
Some(metadata) => {
let metadata: Metadata = metadata
.into_iter()
.filter_map(|kv_result| {
// FIXME: silently hiding errors here
let kv_ref = kv_result.ok()?;
Some((kv_ref.key().ok()??.into(), kv_ref.value().ok()??.into()))
})
.collect();

if metadata.is_empty() {
None
} else {
Some(metadata)
}
},
};

Ok((
arrow_schema,
IpcSchema {
fields: ipc_fields,
is_little_endian,
},
custom_schema_metadata,
))
}

Expand All @@ -415,11 +438,12 @@ pub(super) fn deserialize_stream_metadata(meta: &[u8]) -> PolarsResult<StreamMet
} else {
polars_bail!(oos = "The first IPC message of the stream must be a schema")
};
let (schema, ipc_schema) = fb_to_schema(schema)?;
let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(schema)?;

Ok(StreamMetadata {
schema,
version,
ipc_schema,
custom_schema_metadata,
})
}
5 changes: 4 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::common::*;
use super::schema::deserialize_stream_metadata;
use super::{Dictionaries, OutOfSpecKind};
use crate::array::Array;
use crate::datatypes::ArrowSchema;
use crate::datatypes::{ArrowSchema, Metadata};
use crate::io::ipc::IpcSchema;
use crate::record_batch::RecordBatchT;

Expand All @@ -18,6 +18,9 @@ pub struct StreamMetadata {
/// The schema that is read from the stream's first message
pub schema: ArrowSchema,

/// The custom metadata that is read from the schema
pub custom_schema_metadata: Option<Metadata>,

/// The IPC version of the stream
pub version: arrow_format::ipc::MetadataVersion,

Expand Down
17 changes: 14 additions & 3 deletions crates/polars-arrow/src/io/ipc/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ use crate::datatypes::{
use crate::io::ipc::endianness::is_native_little_endian;

/// Converts a [ArrowSchema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message].
pub fn schema_to_bytes(schema: &ArrowSchema, ipc_fields: &[IpcField]) -> Vec<u8> {
let schema = serialize_schema(schema, ipc_fields);
pub fn schema_to_bytes(
schema: &ArrowSchema,
ipc_fields: &[IpcField],
custom_metadata: Option<&Metadata>,
) -> Vec<u8> {
let schema = serialize_schema(schema, ipc_fields, custom_metadata);

let message = arrow_format::ipc::Message {
version: arrow_format::ipc::MetadataVersion::V5,
Expand All @@ -24,6 +28,7 @@ pub fn schema_to_bytes(schema: &ArrowSchema, ipc_fields: &[IpcField]) -> Vec<u8>
pub fn serialize_schema(
schema: &ArrowSchema,
ipc_fields: &[IpcField],
custom_schema_metadata: Option<&Metadata>,
) -> arrow_format::ipc::Schema {
let endianness = if is_native_little_endian() {
arrow_format::ipc::Endianness::Little
Expand All @@ -37,7 +42,13 @@ pub fn serialize_schema(
.map(|(field, ipc_field)| serialize_field(field, ipc_field))
.collect::<Vec<_>>();

let custom_metadata = None;
let custom_metadata = custom_schema_metadata.and_then(|custom_meta| {
let as_kv = custom_meta
.iter()
.map(|(key, val)| key_value(key.clone().into_string(), val.clone().into_string()))
.collect::<Vec<_>>();
(!as_kv.is_empty()).then_some(as_kv)
});

arrow_format::ipc::Schema {
endianness,
Expand Down
Loading

0 comments on commit db2410f

Please sign in to comment.