Skip to content

Commit

Permalink
Improve C Data Interface and Add Integration Testing Entrypoints (#5080)
Browse files Browse the repository at this point in the history
* Add C Data Interface integration testing entrypoints

* Allow importing FFI_ArrowArray with existing datatype

* Clippy

* Use ptr::write

* Fix null_count for Null type

* Use new from_raw() APIs

* Address some review comments.

* Add unsafe markers

* Try to fix CI

* Revamp ArrowFile
  • Loading branch information
pitrou authored Nov 20, 2023
1 parent 4d141a3 commit b724849
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 137 deletions.
8 changes: 7 additions & 1 deletion arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ impl FFI_ArrowArray {
.collect::<Box<_>>();
let n_children = children.len() as i64;

// As in the IPC format, emit null_count = length for Null type
let null_count = match data.data_type() {
DataType::Null => data.len(),
_ => data.null_count(),
};

// create the private data owning everything.
// any other data must be added here, e.g. via a struct, to track lifetime.
let mut private_data = Box::new(ArrayPrivateData {
Expand All @@ -179,7 +185,7 @@ impl FFI_ArrowArray {

Self {
length: data.len() as i64,
null_count: data.null_count() as i64,
null_count: null_count as i64,
offset: data.offset() as i64,
n_buffers,
n_children,
Expand Down
5 changes: 4 additions & 1 deletion arrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ edition = { workspace = true }
publish = false
rust-version = { workspace = true }

[lib]
crate-type = ["lib", "cdylib"]

[features]
logging = ["tracing-subscriber"]

[dependencies]
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json"] }
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json", "ffi"] }
arrow-flight = { path = "../arrow-flight", default-features = false }
arrow-buffer = { path = "../arrow-buffer", default-features = false }
arrow-integration-test = { path = "../arrow-integration-test", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion arrow-integration-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ln -s <path_to_arrow_rs> arrow/rust

```shell
cd arrow
pip install -e dev/archery[docker]
pip install -e dev/archery[integration]
```

### Build the C++ binaries:
Expand Down
49 changes: 5 additions & 44 deletions arrow-integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field};
use arrow::datatypes::{Fields, Schema};
use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::read_json_file;
use arrow_integration_testing::{canonicalize_schema, open_json_file};
use clap::Parser;
use std::fs::File;
use std::sync::Arc;

#[derive(clap::ValueEnum, Debug, Clone)]
#[clap(rename_all = "SCREAMING_SNAKE_CASE")]
Expand Down Expand Up @@ -66,12 +63,12 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>
eprintln!("Converting {json_name} to {arrow_name}");
}

let json_file = read_json_file(json_name)?;
let json_file = open_json_file(json_name)?;

let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;

for b in json_file.batches {
for b in json_file.read_batches()? {
writer.write(&b)?;
}

Expand Down Expand Up @@ -113,49 +110,13 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>
Ok(())
}

fn canonicalize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Map(child_field, sorted) => match child_field.data_type() {
DataType::Struct(fields) if fields.len() == 2 => {
let first_field = fields.get(0).unwrap();
let key_field =
Arc::new(Field::new("key", first_field.data_type().clone(), false));
let second_field = fields.get(1).unwrap();
let value_field = Arc::new(Field::new(
"value",
second_field.data_type().clone(),
second_field.is_nullable(),
));

let fields = Fields::from([key_field, value_field]);
let struct_type = DataType::Struct(fields);
let child_field = Field::new("entries", struct_type, false);

Arc::new(Field::new(
field.name().as_str(),
DataType::Map(Arc::new(child_field), *sorted),
field.is_nullable(),
))
}
_ => panic!("The child field of Map type should be Struct type with 2 fields."),
},
_ => field.clone(),
})
.collect::<Fields>();

Schema::new(fields).with_metadata(schema.metadata().clone())
}

fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
if verbose {
eprintln!("Validating {arrow_name} and {json_name}");
}

// open JSON file
let json_file = read_json_file(json_name)?;
let json_file = open_json_file(json_name)?;

// open Arrow file
let arrow_file = File::open(arrow_name)?;
Expand All @@ -170,7 +131,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
)));
}

let json_batches = &json_file.batches;
let json_batches = json_file.read_batches()?;

// compare number of batches
assert!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{read_json_file, ArrowFile};
use crate::open_json_file;
use std::collections::HashMap;

use arrow::{
Expand Down Expand Up @@ -45,23 +45,16 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {

let client = FlightServiceClient::connect(url).await?;

let ArrowFile {
schema, batches, ..
} = read_json_file(path)?;
let json_file = open_json_file(path)?;

let schema = Arc::new(schema);
let batches = json_file.read_batches()?;
let schema = Arc::new(json_file.schema);

let mut descriptor = FlightDescriptor::default();
descriptor.set_type(DescriptorType::Path);
descriptor.path = vec![path.to_string()];

upload_data(
client.clone(),
schema.clone(),
descriptor.clone(),
batches.clone(),
)
.await?;
upload_data(client.clone(), schema, descriptor.clone(), batches.clone()).await?;
verify_data(client, descriptor, &batches).await?;

Ok(())
Expand Down
Loading

0 comments on commit b724849

Please sign in to comment.